wissel.net

Usability - Productivity - Business - The web - Singapore & Twins

Handle HTTP chunked responses - Java edition


The Domino REST API delivers collections using chunked transfer encoding. This has the advantage, that you can process results as they arrive. It produces the challenge that the usual client side code is designed to first wait for completion of the request. I wrote about the JavaScript solution a while ago, this is the Java edition.

Client choices

In JavaScript land the choice of client is simple: the Fetch API. In Java we have some choices:

There are probably more around. This article uses the JDK HttpClient. I'll skip the parts with Authentication and TLS handling, check the full example for details.

How it works

First we create an java.net.http.HttpClient. It takes care of the http version and the TLS context.

HttpClient getClient(SSLContext sslContext) {
  return HttpClient.newBuilder()
           .sslContext(sslContext)
           .build();
}

Then we build and execute the request. The magic is the BodySubscriber (more on that below).

Integer runGetRequest(HttpClient client, String url, String authHeader, BodySubscriber subscriber) throws Exception {
  HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(url))
            .header("Authorization", authHeader)
            .GET()
            .build();

  CompletableFuture<Integer> response =
          client.sendAsync(request, responseInfo -> subscriber)
          .whenComplete((r, t) -> System.out.println("Response: " + r.statusCode()))
          .thenApply(HttpResponse::body);

  return response.get();
}

We will see that the call returns the size of data returned, not the individual records. Before digging into the BodySubscriber, we need to decide what to do with the identified "lines". In my example I use a java.util.function.Consumer that takes in the line String and does whatever processing is required. In a high volume system you want to queue this if the processing is non-trivial.

Consumer<String> getConsumer() {
return new Consumer<String>() {
        int count = 0;

         @Override
         public void accept(String t) {
            // SKip the first and last line
            if ("]".equals(t) || "[".equals(t)) {
               return;
            }
            String actual = t.endsWith(",") ? t.substring(0, t.length() - 1) : t;
            JsonObject json = JsonParser.parseString(actual)
                  .getAsJsonObject().get("@meta").getAsJsonObject();

            // This is where the action happens
            System.out.println(json.get("unid").getAsString());
            count++;
            System.out.println("Count: " + count);
          }
    };
}

Now we are ready to implement the BodySubscriber. When you are familiar with the Observer pattern The BodySubscriber is easy to understand, it follows the reactive approach implementing onSubscribe, onNext, onError and onComplete.

public class DocumentSubscriber implements BodySubscriber<Integer> {

    public DocumentSubscriber(Consumer<String> dataSink) {
        this.dataSink = dataSink;
    }

    final Consumer<String> dataSink;
    final CompletableFuture<Integer> result = new CompletableFuture<>();
    Flow.Subscription subscription;
    int totalsize = 0;
    String current = "";

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // Poll the data source
        this.subscription.request(1);

    }

    @Override
    public void onNext(List<ByteBuffer> buffers) {
        int size = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
        byte[] bytes = new byte[size];
        int offset = 0;
        for (ByteBuffer buffer : buffers) {
            int remaining = buffer.remaining();
            buffer.get(bytes, offset, remaining);
            offset += remaining;
            // record size
            totalsize += remaining;
        }
        String chunk = this.current + new String(bytes, StandardCharsets.UTF_8);
        String[] lines = chunk.split("\n");
        var lineCounter = lines.length - 1;
        // capture the last line for the next chunk
        this.current = lines[lineCounter];
        for (int i = 0; i < lineCounter; i++) {
            var oneLine = lines[i];
            dataSink.accept(oneLine);
        }
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        result.completeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
        dataSink.accept(current);
        result.complete(totalsize);
    }

    @Override
    public CompletionStage<Integer> getBody() {
        return result;
    }
}

There are a few interesting aspects to pay attention to:

  • The DRAPI JSON result is send line by line bundled in chunks, so the separator is \n
  • a chunk doesn't care for the records returned and can stop in the middle of a line to continue in the next chunk
  • So after converting the bytes into a string and splitting it into lines, the last line needs to be memorized for the next chunk
  • On completion there's one line left to produce, so the onComplete has a call to the Consumer

Check the full example.

As usual YMMV


Posted by on 09 October 2024 | Comments (0) | categories: Java WebDevelopment

Comments

  1. No comments yet, be the first to comment