wissel.net

Usability - Productivity - Business - The web - Singapore & Twins

This article is part of a mini series. Read them all:

EventBus Streaming

A Streaming Pattern for the vert.x EventBus (Part 1)


When dealing with large amounts of data, using Streams allows processing happen the moment data arrives, not just when data is complete. Streaming is core to reactive programming. This blog entry describes an approach where the vert.x EventBus sits between requester and resource

The scenario

Classic Eventbus Request Response

Image created using WebSequenceDiagramms

title Vert.x EventBus

participant Requester
participant EventBus
participant DataSource

Requester->EventBus: request DataSet
EventBus->DataSource: forward request
DataSource->EventBus: reply with data
EventBus->Requester: forward reply

A requester (e.g. the handler of a HTTP Listener) sends a request via the EventBus using a request-response pattern with EventBus.request(...). Simple and easy. The problem with this: one request has one response. That doesn't work for streaming data.

Taking a page from the military

The standard pattern for military commands is:

  1. Utter the command
  2. Accnowledge the command
  3. Execute the command (Conquer 14 countries, might take time. For Germans: Liberate 14 countries)
  4. Report completion of command

Applying to the EventBus

Following the pattern above the first request/response only will establish the Intent (btw. Intent Based Leadership is a smoking hot topic). Item 2 and 3 will be handled by a publish and subscribe pattern.

So our scenario now looks like this:

Classic Eventbus Request Response

Image created using WebSequenceDiagramms

title Vert.x EventBus Streaming

participant Requester
participant EventBus
participant DataSource

Requester->EventBus: start listening\non temp address
note over Requester, DataSource: Start request/response
Requester->EventBus: request Stream\notify on temp address
EventBus->DataSource: forward request
DataSource->EventBus: reply withaccnowledgement
EventBus->Requester: forward response
note over Requester, DataSource: End of request/response
note over Requester, DataSource: Start publish/subscribe
DataSource->EventBus: publish first data
EventBus->Requester: forward response
DataSource->EventBus: publish more data
EventBus->Requester: forward response
DataSource->EventBus: publish last data
EventBus->Requester: forward response
Requester->EventBus: end listening\non temp address
note over Requester, DataSource: End of publish/subscribe

To implement this, I'm taking advantage of EventBus' DeliveryOptions that allow me to set header values. I define a header StreamListenerAddress that my data source will use for publishing data:

// Error handling omitted
public void initiateStreamResponse(final String dataAddress, final JsonObject requestMessage, Promise<Void> didItWork) {
	final String streamListenerAddress = "tempListenerAddresses." + UUID.randomUUID().toString();
	final EventBus eventBus = this.getVertx().eventBus();
	final MessageConsumer<JsonObject> dataReceiver = eventBus.consumer(streamListenerAddress);
	dataReceiver.handler(handler -> {
		final boolean isFirst = Boolean.parseBoolean(headers.get("first"));
		final boolean isComplete = Boolean.parseBoolean(headers.get("complete"));
		/*
	      Here goes the code feeding into the requester's logic e.g. a chunked HTTP response
	      or a websocket publish or a gRPC push. isFirst and isComplete can be true at the
	      same time when there is only a single response
		*/
	    ditItWork.complete();  
	});
	DeliveryOptions deliveryOptions = new DeliveryOptions();
	deliveryOptions.addHeader("StreamListenerAddress",streamListenerAddress);
	eventBus.request(dataAddress, requestMessage, deliveryOptions, ar -> {
			if (ar.succeeded()) {
				final Message<Object> resultMessage = ar.result();
				final boolean success = Boolean.parseBoolean(resultMessage.headers().get(Constants.HEADER_SUCCESS));
				if (!success) {
					consumer.unregister();
					didItWork.fail(new Error("Request for Data unsuccessfull"));
				}

			} else {
				consumer.unregister();
				didItWork.fail(ar.cause());
			}
		});
}

What next?

  • In Part 2 I will describe the data source part of this approach
  • In Part 2 I will wrap that in observable and observer

I'm using this pattern in the Keep API, YMMV


Posted by on 04 December 2019 | Comments (0) | categories: Java Reactive vert.x

Comments

  1. No comments yet, be the first to comment