Nonblocking Java Async Processing - how to constrain memory usage?

24
June 13, 2019, at 3:10 PM

I'm coming back to Java after a few years away, and have been excited to see the introduction of non-blocking async support in the new java.net.http.HttpClient and in the AWS Java SDK 2.0. I heard about the concepts of Reactive Programming years ago in conference talks, but haven't had much chance to apply those ideas in practice.

I have a problem that seems well suited to playing around with this style of programming: Basically I want to download a bunch of files (say 10,000) over HTTP and write them back out to S3.

I've used failsafe to implement retries for nonblocking async http GETs, and it's straightforward to composes those with uploads via the S3 async client (see the sketch below).

However, I'm not sure how to properly constrain the memory usage of the program: there is no mechanism to apply backpressure and prevent an out-of-memory exception if files are downloaded faster than they're written back out to S3.

I'm familiar with some traditional blocking solutions to this problem - e.g. use a semaphore to limit the number of concurrent downloads, or have downloads write out to some bounded blocking queue that S3 upload threads will pull from. However, if I'm going to use such a blocking mechanism to apply the backpressure, then it makes me question the advantage of using nonblocking IO in the first place.

Is there a more idiomatic "reactive" way to accomplish the same goal?

import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class BackupClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
    private final HttpClient httpClient = HttpClient.newBuilder().build();
    private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();
    public runBackup(List<URI> filesToBackup) {
        List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
                .map(backupClient::submitBackup)
                .collect(Collectors.toList());
        futures.forEach(CompletableFuture::join);
    }
    private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
        return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
                .thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
                        .bucket("my-bucket")
                        .key(uri.toASCIIString())
                        .build(), AsyncRequestBody.fromString(httpResponse.body())));
    }

    private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
        final HttpRequest request = HttpRequest.newBuilder()
                .uri(uri)
                .timeout(Duration.ofMinutes(2))
                .GET()
                .build();
        final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
                .withMaxRetries(4)
                .withDelay(Duration.ofSeconds(1))
                .handleResultIf(response -> 200 != response.statusCode());
        return Failsafe.with(retryPolicy)
                .getStageAsync(context -> {
                    if (context.getAttemptCount() > 0) {
                        LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
                    }
                    return this.httpClient.sendAsync(request, handler);
                });
    }
}
READ ALSO
Filter log by conversion pattern [%t]

Filter log by conversion pattern [%t]

I am using the log4j2 version 211

43
Different deserialization behavior between Java 8 and Java 11

Different deserialization behavior between Java 8 and Java 11

I have a problem with deserialization in Java 11 that results in a HashMap with a key that can't be foundI would appreciate if anyone with more knowledge about the issue could say if my proposed solution looks ok, or if there is something better I could do

19
UDP-Java Socket how can I exceed args.length &gt; 2 to test how robust the client server connection is?

UDP-Java Socket how can I exceed args.length > 2 to test how robust the client server connection is?

I tried to enter a different IntetAdress object with googlede before it was initialized to null, which does not make a difference in output

35