wissel.net

Usability - Productivity - Business - The web - Singapore & Twins

By Date: October 2021

Java Streams filters with side effects


Once you get used to stream programming and the pattern of create, select, manipulate and collect your code will never look the same

Putting side effects to good (?) use

The pure teachings tell us, filters should select objects for processing and not have any side effects or do processing on their own. But ignoring the teachings could produce clean code (I probably will roast in debug hell for this). Let's look at an example:

final Collection<MyNotification> notifications = getNotifications();
final Iterator<MyNotification> iter = notifications.iterator();

while(iter.hasNext()) {
  MyNotification n = iter.next();

  if (n.priority == Priority.high) {
    sendHighPriority(n);
  } else if (n.groupNotification) {
    sendGroupNotification(n);
  } else if (n.special && !n.delay > 30) {
    sendSpecial(n);
  } else if (!n.special) {
    sendStandard(n);
  } else {
    reportWrongNotification(n);
  }
}

This gets messy very fast and all selection logic is confined to the if conditions in one function (which initially looks like a good idea). How about rewriting the code Stream style? It will be more boiler plate, but better segregation:

final Stream<MyNotification> notifications = getNotifications();

notifications
  .filter(this::highPriority)
  .filter(this::groupSend)
  .filter(this::specialNoDelay)
  .filter(this::standard)
  .forEach(this::reportWrongNotification);

The filter functions would look like this:

boolean highPriority(final MyNotification n) {
  if (n.priority == Priority.high) {
    sendHighPriority(n);
    return false; // No further processing required
  }
  return true; // Furhter processing required
}

boolean groupSend(final MyNotification n) {
  if (n.groupNotification) {
    sendGroupNotification(n);
    return false; // No further processing required
  }
  return true; // Furhter processing required
}

You get the idea. With proper JavaDoc method headers, this code looks more maintainable.
We can push this a little further (as explored on Stackoverflow). Imagin the number of process steps might vary and you don't want to update that code for every variation. You could do something like this:

final Stream<MyNotification> notifications = getNotifications();
final Stream<Predicate<MyNotifications>> filters = getFilters();

notifications
  .filter(filters.reduce(f -> true, Predicate::and))
  .forEach(this::reportWrongNotification);

As usual YMMV


Posted by on 22 October 2021 | Comments (1) | categories: Java

Streaming CouchDB data


I'm a confessing fan of CouchDB, stream programming and the official CouchDB NodeJS library. Nano supports returning data as NodeJS Stream, so you can pipe it away. Most examples use file streams or process.stdout, while my goal was to process individual documents that are part of the stream

You can't walk into the same stream a second time

This old Buddhist saying holds true for NodeJS streams too. So any processing needs to happen in the chain of the stream. Let's start with the simple example of reading all documents from a couchDB:

const Nano = require("nano");
const nano = Nano(couchDBURL);
nano.listAsStream({ include_docs: true }).pipe(process.stdout);

This little snippet will read out all documents in your couchDB. You need to supply the couchDBURL value, e.g. http://localhost:5984/test. On a closer look, we see that the data returned arrives in continious buffers that don't match JSON document boundaries, so processing one document after the other needs extra work.

A blog entry in the StrongLoop blog provides the first clue what to do. To process CouchDB stream data we need both a Transform stream to chop incoming data into line by line and a writable stream for our results.

Our code, finally will look like this:

const Nano = require("nano");
const { Writable, Transform } = require("stream");

const streamOneDb = (couchDBURL, resultCallback) => {
  const nano = Nano(couchDBURL);
  nano
    .listAsStream({ include_docs: true })
    .on("error", (e) => console.error("error", e))
    .pipe(lineSplitter())
    .pipe(jsonMaker())
    .pipe(documentWriter(resultCallback));
};

Let's have a closer look at the new functions, the first two implement transform, the last one writable:

  • lineSplitter, as the name implies, cuts the buffer into separate lines for processing. As far as I could tell, CouchDB documents always returned on one line
  • jsonMaker, extracts the documents and discards the wrapper with document count that surrounds them
  • documentWriter, writing out the JSON object using a callback

Read more

Posted by on 16 October 2021 | Comments (1) | categories: CouchDB NodeJS