An API-level Look at Reactive Stardog
Get the latest in your inbox
Get the latest in your inbox
In Reactive, Streaming Stardog Kernel, we looked at the design of and motivations for upcoming changes to the SNARL API. In this post we look at the new API in detail.
We don’t suffer lack of choice in the Java world: RxJava, Project Reactor, Reactive Spring, and Cyclops. All of these support Reactive Streams, a multi-organization effort to define the semantics and building blocks of a reactive library.
I played with all of them; the Query API within SNARL is a natural fit for a reactive design, so I had a minimal prototype of this subset of the API plus an implementation of the SPARQL Protocol as my playground. Eventually, I settled on RxJava 2. It supports Reactive Streams, unlike its predecessor, and was the most comfortable to work with. It also had the bonus of seamless integration with Hystrix. If you haven’t looked at the new version yet, there’s a good overview of what’s changed.
But I really got sold on it when I discovered their hooks for making testing easy:
aCatalog.get(aDb, DatabaseOptions.CREATOR)
.test().assertValue(aValue).assertComplete();
This is a snippet from a test case for getting a value from the System Catalog
for a given database. The result of the lookup is Maybe<T>
: I’m expecting
one value or no value. The property is set, or it isn’t.
I can easily check that I got my one expected result and that I correctly signaled the stream is complete. This made testing a breeze because I didn’t have to roll my own test utilities. That little bit of extra productivity goes a long way.
We’ll start with the Query API since that’s probably the most commonly used part of the API. Creating a select query in SNARL still looks pretty familiar:
SelectQuery select(@Nonnull final QueryString theQuery);
QueryString
is just a composition of the actual query string and the query
language. This small change makes it easier for us to support different query
languages on top of Stardog’s query engine. And for the common case, we offer
the utility class SPARQL
.
You get back a SelectQuery
, which is very similar to the fluent-style interface
in the current version of SNARL. It supports the same limit, offset, and
parameterization configuration as the current interface. It’s not until we
execute the query that we begin to see differences:
Flowable<BindingSet> execute();
Flowable
is the RxJava 2 representation of a stream of data, in this case,
BindingSet
, which is a single answer to the query. So we’ve got a stream of
answers to the query that will get pushed to us as they’re evaluated. Great!
If you’re executing this query against a remote database, these are pushed to you as they come in over the socket. If you’re querying the embedded server, these are pushed up from within the query engine.
How about another example. Here’s a condensed version of answering a select query and writing the results, in some format back, to the caller as it stands today:
try {
final SelectQuery aQuery = aConn.select(aQueryStr, aBase);
// set limit, offset, bindings, active graph
// wait here and write the results
QueryResultIO.writeTuple(aQuery.execute(),
aFormat,
theExchange.getOutputStream());
}
catch (Exception e) {
handleError(theExchange, e);
}
finally {
theExchange.endExchange();
}
That’s fine. No big surprises. Verbose, yeah, but it’s Java!
Let’s compare with the new API:
final SelectQuery aQuery = aConn.select(SPARQL.query(aQueryStr, aBase));
// set limit, offset, bindings, active graph
// continue on and write the results to the caller as they are computed
aQuery.execute()
.doAfterTerminate(theExchange::endExchange)
.doOnError(e -> handleError(theExchange, e))
.subscribe(QueryResults.tupleResultWriter(aFormat,
theExchange.getOutputStream()));
About half as much code, doing the same thing, but asychronously, and arguably
easier to read. Flexible enough that if I need to block and wait for the
result(s), it’s trivial to get the results as a Collection<BindingSet>
. In
fact, it’s so easy, not only can a caveman do it, but it would be easy to
provide an implementation of the existing (blocking) interface(s) on top of the
new reactive ones, which is a nice bonus.
We really start to see the benefits of the approach if we want to attach a progress monitor. Or what if I wanted to memoize the results of the query so we can return cached results next time? With the current design, I’d most easily achieve that with some decorator. It might look something like:
QueryResultIO.writeTuple(new MonitoredResult(aMonitorConfig,
new ResultMemoizer(aResultCache, aQuery.execute())),
aFormat, theExchange.getOutputStream());
Workable, but we’re losing readability. And we don’t want to add too much latency to outputing each result when the decorators process the results, so we might want to schedule those on different threads. What’s that going to look like?
QueryResultIO.writeTuple(new MonitoredResult(aMonitorConfig, aExecutorService,
new ResultMemoizer(aResultCache, aOtherExecutorService, aQuery.execute())),
aFormat, theExchange.getOutputStream());
Now it’s time to play a game: spot the query execution! Seriously, it’s really buried in there. It works, but, that’s not fun. And the dispatch in those decorators to other threads may get hairy.
What about the reactive version?
aQuery.execute()
.compose(new Monitor(aMonitorConfig, aExecutorService))
.compose(new ResultCacher(aResultCache, aOtherExecutorService))
.doAfterTerminate(theExchange::endExchange)
.doOnError(e -> handleError(theExchange, e))
.subscribe(QueryResults.tupleResultWriter(aFormat, theExchange.getOutputStream()));
This is actually two additional lines of code, but I think unarguably more clear. Implementing operators like this is the normal way to use and extend the streams of data.
Further, the original case does not cover what happens when we’re generating the results of the query faster than we can write them over the wire. We can’t just send a firehose of results to the output stream. We need something more controlled that will let us use the resources available to us both efficiently and predictably. There needs to be a backpressure mechanism for the pipeline. With the previous approach this is something we’ll have to provide ourselves.
With Reactive Streams, backpressure is a native concept, and RxJava makes it
easy to take advantage of it. Before that call to subscribe
we can include
onBackpressureBuffer(100)
to insert a buffer with a capacity of 100 for
handling backpressure between producer and consumer.
We’re using Flowable
and it’s siblings Completable
, Single
, and
Maybe
—which are designed for when you have 0
, 1
, or 0 || 1
results—throughout the SNARL API.
For example, if you wanted a list of databases in Stardog, you’d call
Collection<String> list()
on AdminConnection
. We probably use this most
often in our tests like this:
try (AdminConnection aConn = ... create connection ... ) {
if (aConn.list().contains(aDb)) {
// do something
}
}
Can you spot the inefficiency here?
We’re allocating an instance of Collection
which is basically not used at all.
That’s an allocation we didnt need to do and more work for the garbage
collector. What if what we were looking for wasn’t in there? That’s a total
waste!
If you’re doing something wasteful like that once or twice, no big deal. But
what if you do it a hundred times a second? Little things like that add up. Now
the list
method returns a Flowable<String>
and the code looks more
like:
try (AdminConnection aConn = ... create connection ... ) {
aConn.list()
.filter(theDbName -> theDbName.equals(aDb))
.firstElement()
.subscribe(theDbName -> {
// do something
});
}
Code is a bit longer, but just as clear, and avoids the allocation of the list. Similarly, dropping a database now looks like:
aConn.drop(aDb)
.subscribe(() -> {
theExchange.setStatusCode(StatusCodes.NO_CONTENT);
theExchange.endExchange();
},
theError -> sendError(theExchange, theError))
We drop the database, and we send the success response if it worked. Otherwise, we write an error response.
Even creating a database and committing a transaction will return a Flowable
.
This Flowable
will be attached to the bulk load or commit progress itself and
give you an easy way to track the progress of the operation. Here’s an example
from the HTTP server where we’re writing the bulk load progress as a series of
JSON events back to the caller, which can attach a progress monitor on the other
side to provide feedback to the user:
aBuilder.set(aMeta)
.namedGraphs(aGraphs::get)
.create(aFiles)
.subscribe(Subscribers.create()
.onNext(e -> printProgress(aOut))
.onError(e -> sendError(theExchange, e))
.onComplete(theExchange::endExchange).build());
There are other, more cosmetic changes coming to the SNARL API. In most
cases, we’ll be returning a Flowable
or one of its siblings as the result from
a method. But sometimes returning a value directly is a more appropriate design.
In those cases, if it’s possible there won’t be a value, we’ll be returning an
Optional
; null
will no longer be a valid return value from a method.
No checked exceptions. We started this effort when we first transitioned to
Java 8, but there were some places you could find checked exceptions still in
use for legacy reasons. We’ll throw exceptions when appropriate, but more
commonly, they’ll be reported in the Error
event (ie onError
) of the
subscriber(s) to the Flowable
.
Additionally, you’ll notice that the postfix Exception
is omitted in almost
every cause. Rather than DatabaseDoesNotExistException
we are opting for
DatabaseDoesNotExist
. The inclusion of postfixed Exception
doesn’t add any
new information, it’s pretty clear from usage in a throws
or catch
clause
that it’s an Exception. We dont need to add more verbosity to Java.
Method naming will not adhere to the conventional Java-bean get/set nomenclature. In most cases the prefix isn’t adding information you wouldn’t have already known by looking at the usage. It’s just something that requires extra typing without adding extra value.
You’ll also notice that all public interfaces will use JSR-305 annotations to
further document their behavior, which plays nicely with most IDEs, and we’ll be
using
the new javadoc annotations
@apiNote
, @implSpec
, and @implNote
to provide clearer documentation for
the codebase.
Download Stardog today to start your free 30-day evaluation.
How to Overcome a Major Enterprise Liability and Unleash Massive Potential
Download for free