An API-level Look at Reactive Stardog

by Mike Grove, 19 January 2017 · 8 minute read

In Reactive, Streaming Stardog Kernel, we looked at the design of and motivations for upcoming changes to the SNARL API. In this post we look at the new API in detail.

Selecting a Framework

We don’t suffer lack of choice in the Java world: RxJava, Project Reactor, Reactive Spring, and Cyclops. All of these support Reactive Streams, a multi-organization effort to define the semantics and building blocks of a reactive library. This is, more or less, what’s going to be in JDK 9.

I played with all of them; the Query API within SNARL is a natural fit for a reactive design, so I had a minimal prototype of this subset of the API plus an implementation of the SPARQL Protocol as my playground. Eventually, I settled on RxJava 2. It supports Reactive Streams, unlike its predecessor, and was the most comfortable to work with. It also had the bonus of seamless integration with Hystrix. If you haven’t looked at the new version yet, there’s a good overview of what’s changed.

But I really got sold on it when I discovered their hooks for making testing easy:

aCatalog.get(aDb, DatabaseOptions.CREATOR)
        .test().assertValue(aValue).assertComplete();

This is a snippet from a test case for getting a value from the System Catalog for a given database. The result of the lookup is Maybe<T>: I’m expecting one value or no value. The property is set, or it isn’t.

I can easily check that I got my one expected result and that I correctly signaled the stream is complete. This made testing a breeze because I didn’t have to roll my own test utilities. That little bit of extra productivity goes a long way.

Reactive SNARL?

We’ll start with the Query API since that’s probably the most commonly used part of the API. Creating a select query in SNARL still looks pretty familiar:

SelectQuery select(@Nonnull final QueryString theQuery);

QueryString is just a composition of the actual query string and the query language. This small change makes it easier for us to support different query languages on top of Stardog’s query engine. And for the common case, we offer the utility class SPARQL.

You get back a SelectQuery, which is very similar to the fluent-style interface in the current version of SNARL. It supports the same limit, offset, and parameterization configuration as the current interface. It’s not until we execute the query that we begin to see differences:

Flowable<BindingSet> execute();

Flowable is the RxJava 2 representation of a stream of data, in this case, BindingSet, which is a single answer to the query. So we’ve got a stream of answers to the query that will get pushed to us as they’re evaluated. Great!

If you’re executing this query against a remote database, these are pushed to you as they come in over the socket. If you’re querying the embedded server, these are pushed up from within the query engine.

How about another example. Here’s a condensed version of answering a select query and writing the results, in some format back, to the caller as it stands today:

try {
	final SelectQuery aQuery = aConn.select(aQueryStr, aBase);
	// set limit, offset, bindings, active graph
	// wait here and write the results
	QueryResultIO.writeTuple(aQuery.execute(),
				 aFormat,
			         theExchange.getOutputStream());
}
catch (Exception e) {
	handleError(theExchange, e);
}
finally {
	theExchange.endExchange();
}

That’s fine. No big surprises. Verbose, yeah, but it’s Java!

Let’s compare with the new API:

final SelectQuery aQuery = aConn.select(SPARQL.query(aQueryStr, aBase));

// set limit, offset, bindings, active graph
// continue on and write the results to the caller as they are computed

aQuery.execute()
      .doAfterTerminate(theExchange::endExchange)
      .doOnError(e -> handleError(theExchange, e))
      .subscribe(QueryResults.tupleResultWriter(aFormat, 
                              theExchange.getOutputStream()));

About half as much code, doing the same thing, but asychronously, and arguably easier to read. Flexible enough that if I need to block and wait for the result(s), it’s trivial to get the results as a Collection<BindingSet>. In fact, it’s so easy, not only can a caveman do it, but it would be easy to provide an implementation of the existing (blocking) interface(s) on top of the new reactive ones, which is a nice bonus.

We really start to see the benefits of the approach if we want to attach a progress monitor. Or what if I wanted to memoize the results of the query so we can return cached results next time? With the current design, I’d most easily achieve that with some decorator. It might look something like:

QueryResultIO.writeTuple(new MonitoredResult(aMonitorConfig, 
                         new ResultMemoizer(aResultCache, aQuery.execute())),
			             aFormat, theExchange.getOutputStream());

Workable, but we’re losing readability. And we don’t want to add too much latency to outputing each result when the decorators process the results, so we might want to schedule those on different threads. What’s that going to look like?

QueryResultIO.writeTuple(new MonitoredResult(aMonitorConfig, aExecutorService, 
                         new ResultMemoizer(aResultCache, aOtherExecutorService, aQuery.execute())),
				     aFormat, theExchange.getOutputStream());

Now it’s time to play a game: spot the query execution! Seriously, it’s really buried in there. It works, but, that’s not fun. And the dispatch in those decorators to other threads may get hairy.

What about the reactive version?

aQuery.execute()
	  .compose(new Monitor(aMonitorConfig, aExecutorService))
	  .compose(new ResultCacher(aResultCache, aOtherExecutorService))
          .doAfterTerminate(theExchange::endExchange)
          .doOnError(e -> handleError(theExchange, e))
          .subscribe(QueryResults.tupleResultWriter(aFormat, theExchange.getOutputStream()));

This is actually two additional lines of code, but I think unarguably more clear. Implementing operators like this is the normal way to use and extend the streams of data.

Further, the original case does not cover what happens when we’re generating the results of the query faster than we can write them over the wire. We can’t just send a firehose of results to the output stream. We need something more controlled that will let us use the resources available to us both efficiently and predictably. There needs to be a backpressure mechanism for the pipeline. With the previous approach this is something we’ll have to provide ourselves.

With Reactive Streams, backpressure is a native concept, and RxJava makes it easy to take advantage of it. Before that call to subscribe we can include onBackpressureBuffer(100) to insert a buffer with a capacity of 100 for handling backpressure between producer and consumer.

Reacting Beyond Query Results

We’re using Flowable and it’s siblings Completable, Single, and Maybe—which are designed for when you have 0, 1, or 0 || 1 results—throughout the SNARL API.

For example, if you wanted a list of databases in Stardog, you’d call Collection<String> list() on AdminConnection. We probably use this most often in our tests like this:

try (AdminConnection aConn = ... create connection ... ) {
	if (aConn.list().contains(aDb)) {
		// do something
	}
}

Can you spot the inefficiency here?

We’re allocating an instance of Collection which is basically not used at all. That’s an allocation we didnt need to do and more work for the garbage collector. What if what we were looking for wasn’t in there? That’s a total waste!

If you’re doing something wasteful like that once or twice, no big deal. But what if you do it a hundred times a second? Little things like that add up. Now the list method returns a Flowable<String> and the code looks more like:

try (AdminConnection aConn = ... create connection ... ) {
	aConn.list()
		 .filter(theDbName -> theDbName.equals(aDb))
		 .firstElement()
		 .subscribe(theDbName -> {
		 	// do something
		 });
}

Code is a bit longer, but just as clear, and avoids the allocation of the list. Similarly, dropping a database now looks like:

aConn.drop(aDb)
	 .subscribe(() -> {
				theExchange.setStatusCode(StatusCodes.NO_CONTENT);
				theExchange.endExchange();
				},
				theError -> sendError(theExchange, theError))

We drop the database, and we send the success response if it worked. Otherwise, we write an error response.

Even creating a database and committing a transaction will return a Flowable. This Flowable will be attached to the bulk load or commit progress itself and give you an easy way to track the progress of the operation. Here’s an example from the HTTP server where we’re writing the bulk load progress as a series of JSON events back to the caller, which can attach a progress monitor on the other side to provide feedback to the user:

aBuilder.set(aMeta)
        .namedGraphs(aGraphs::get)
        .create(aFiles)
        .subscribe(Subscribers.create()
                              .onNext(e -> printProgress(aOut))
                              .onError(e -> sendError(theExchange, e))
                              .onComplete(theExchange::endExchange).build());

What Else?

There are other, more cosmetic changes coming to the SNARL API. In most cases, we’ll be returning a Flowable or one of its siblings as the result from a method. But sometimes returning a value directly is a more appropriate design. In those cases, if it’s possible there won’t be a value, we’ll be returning an Optional; null will no longer be a valid return value from a method.

No checked exceptions. We started this effort when we first transitioned to Java 8, but there were some places you could find checked exceptions still in use for legacy reasons. We’ll throw exceptions when appropriate, but more commonly, they’ll be reported in the Error event (ie onError) of the subscriber(s) to the Flowable.

Additionally, you’ll notice that the postfix Exception is omitted in almost every cause. Rather than DatabaseDoesNotExistException we are opting for DatabaseDoesNotExist. The inclusion of postfixed Exception doesn’t add any new information, it’s pretty clear from usage in a throws or catch clause that it’s an Exception. We dont need to add more verbosity to Java.

Method naming will not adhere to the conventional Java-bean get/set nomenclature. In most cases the prefix isn’t adding information you wouldn’t have already known by looking at the usage. It’s just something that requires extra typing without adding extra value.

You’ll also notice that all public interfaces will use JSR-305 annotations to further document their behavior, which plays nicely with most IDEs, and we’ll be using the new javadoc annotations @apiNote, @implSpec, and @implNote to provide clearer documentation for the codebase.

Download Stardog today to start your free 30-day evaluation.

Mike Grove

19 January 2017

Social Media Icon Social Media Icon