NoSQL Zone is brought to you in partnership with:

Don Pinto is a Product Marketing Manager with experience in cloud and database technologies. Don is a DZone MVB and is not an employee of DZone and has posted 91 posts at DZone. You can read more from them at their website. View Full User Profile

Why Couchbase Chose RxJava for the New Java SDK

  • submit to reddit

Originally written by Michael Nitschinger

This blog post explains our reasoning and motivation behind choosing RxJava as one of the integral components in our new Java SDK.


There are many ways to design an API and every one has its own set of benefits (and drawbacks). In the process of designing our brand new APIs one of the main questions was how do we expose it to the user. 

One question we didn't have to ask ourselves was: should it be synchronous or asynchronous? We strongly believe that asynchronous APIs are the only sane way to get the performance and scalability you very often need, and also its is much easier to go from async to sync than the other way round. The current stable SDK (1.4.3 at the time of writing) already makes heavy use of Futures in various ways to provide async responses and this dates back into 2006/7 where spymemcached originally introduced the concept into its API.

It is well known that the Java Future interface is very limited compared to other solutions (like the Scala futures). In addition, it is also a bit tricker to code if you need to build async dataflows where one computation depends on the other and you want to have the whole thing async. In recent versions we added support for listeners, which improve the situation quite a bit but still are not an ideal solution.

Over the last few years, other libraries and patterns emerged which we followed closely. One of the mature concepts is known as Reactive Extensions, originating out of Microsoft and .NET. It is based around the idea that applications should be event-oriented and react to those events in a asynchronous ways. It defines a very rich set of operators on what you can do with the data (modify, combine, filter it and so on). Recenly, Netflix ported it over to Java and nicknamed it RxJava (note that while the project currently lives under the Netflix namespace, it will be moved to "io.reactivex" sooner than later). It is very stable and also provides adapters for other JVM languages like Scala, Groovy and JRuby which plays well with our plans to broaden support as well.

The Concept

The main idea of Rx revolves around Observables and its observers. If you haven't came across this concept, you can think of the Observable as the asynchronous and push-based cousin (or more formally called a dual) of an Iterable. More specifically, here is their relation:

Event Iterable (pull) Observable (push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete returns onCompleted()

Every time data gets pushed into an Observable, every observe that is subscribed to it receives the data in its onNext() method. If the observable is completed eventually (which doesn't have to be always the case). the onCompleted method is called. Now anywhere in the process, if an error occurs the onError method is called and the Observable is also considered to be complete.

If you like grammar, the contract looks like this:

OnNext* (OnCompleted | OnError)?

Specifically note that there is no distinction if only 1 or N data is returned, this can be normally inferred from the methods that you call and how it is documented. It does not change your programming flow anyway. Since that is a little abstract, let's look at a concrete example. On the CouchbaseCluster class, there is a method called openBucket which initializes all needed resources and then returns a Bucket instance for you to work with. Now you can imagine opening sockets, grabing a config and so forth takes some time, so this is a perfect candidate. The blocking API would look like:

interface Cluster {
        Bucket openBucket(String name, String password);

How can we make it asynchronous? We need to wrap it into an Observable:

interface Cluster {
        Observable<Bucket> openBucket(String name, String password);

So we now return an observable which will eventually return with a bucket instance that we can use. Let's add a observer:

cluster.openBucket().subscribe(new Observer<Bucket>() {
    public void onCompleted() {
        System.out.println("Observable done!");

    public void onError(Throwable e) {
        System.err.println("Something happened");

    public void onNext(Bucket bucket) {
        System.out.println("Received bucket: " + bucket);

Note that these methods are called on a different thread, so if you leave the code like this and quit your main thread afterwards, you probably won't see anything. While you could now write all the rest of your code in the onNext method, thats probably not the best way to do it. Since the bucket is something 
want to open upfront, you could block on it and then proceed with the rest of your code. Every Observable can be converted into a blocking observable, which feels like an Iterable:

BlockingObservable<Bucket> blockingObservable = cluster.openBucket().toBlocking();

You will find many methods to iterate over the received thata in a blocking fashion, but there are also shorthand methods if you only expect one single value (which we know is the case for us):

Bucket bucket = cluster.openBucket().toBlocking().single();

What happens here internally is that the value called in onNext is stored for us and returned once onComplete is called. if onError is called, the throwable is thrown directly and you can catch it.

Unifying APIs

Now what you've seen already barely touches the surface. The bucket opening could very well be handled also with a Future<Bucket> alone. Where Observables come to shine is when you need to work with more than one result returned. In this case, a Future<T> doesn't fit the bill anymore and Future<Collection<T>> or something similar does not have the same contract. Since Observables imply that more than one T can be returned, APIs can look the same even if sometimes one and sometimes more than one Ts are returned.

Again, let's look at a concrete example. The SDK exposes a get method which returns one document. It looks like this:

interface Bucket {
        Observable<JsonDocument> get(String id);

But we also support Querying (Views, N1QL) which potentially return more than one result (or even none). Thanks to the Observable contract, we can build an API like this:

interface Bucket {
        Observable<ViewResult> query(ViewQuery query);

See? The contract implicitly says "if you pass in a query, you get N ViewResults back", since you know how an Observable needs to behave. And for a bigger picture, here are even more methods that intuitevly behave the way you expect them to.

interface Bucket {
    <D extends Document<?>> Observable<D> insert(D document);
    <D extends Document<?>> Observable<D> upsert(D document);
    <D extends Document<?>> Observable<D> replace(D document);

    Observable<ViewResult> query(ViewQuery query);
    Observable<QueryResult> query(Query query);
    Observable<QueryResult> query(String query);

    Observable<Boolean> flush();

Async my dataflow!

So far we have seen what Observables can do for us and how they help us with providing cohesive, simple and yet asynchronous APIs. But Observables really shine with their composability aspects. You can do lots of things with Observables, and we can't cover them all in this post. RxJava has very good reference documentation which can be found here, so check it out. It is using marble diagrams to show how async dataflows work, also something that we want to provide as part of our documentation in the future.

Published at DZone with permission of Don Pinto, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)