RX: Reactive Libraries

Over the last several years, the term “reactive programming” became popular in many programming languages. Reactive Manifesto was published although it gives a rather generic definition of reactive systems http://www.reactivemanifesto.org. Yes, an app should response fast (Responsive), remain functional in case of errors (Resilient), be flexible in regards to increasing/decreasing computational resources (Elastic), and be based on asynchronous events (Message Driven).

Declaring the principles is a good start, but how to apply these principles in a concrete app? Let’a talk about reactive extensions libraries available for many programming languages.

About seven years ago, Erik Meijer from Microsoft created Reactive Extensions (Rx) – a set of libraries Rx.NET for processing asynchronous event-driven data streams. For example, someone posted a tweet and you received an immediate notification.

A non-reactive way of receiving tweets would be visiting twitter.com every now and then and reload the page hoping that one of the people you follow posted a new tweet – this is known as polling. The load on the server will substantially increase if every user will keep polling the server. The push model is a lot more efficient – just subscribe to the tweeter feed and get the tweets asynchronously pushed to you as they become available.

An online auction is another use case for the async data push. A user bids on the product, but other users may overbid her. Bids should be implemented as a stream that allows subscription so the users won’t need to constantly check if their offers are still the winning ones.

Another example is a stream on stock prices during the working ours of a stock exchange. Or take a stream of signals from a sensor (e.g. an accelerometer in your phone). Even the process of dragging the mouse over the screen can be treated as a stream of coordinates of the mouse pointer.

Five years ago, Microsoft released Rx.NET as an open source project. People liked it and the library was ported to other programming languages: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.

Disclaimer. This post is not a Rx tutorial, but a brief introduction of the main Rx players. I work with RxJS and RxJava, but in this post, I’ll be using code samples in JavaScript

Let’s get familiar with the main concepts of Rx libraries, but first consider this non-reactive code:

int a1 = 2;

int b1 = 4;



int c1 = a1 + b1;  // c1 = 6

    

a1 = 55;           // c1 = 6, but should be 59    
b1 = 20;           // c1 = 6, but should be 75

After the execution of this code, c1 is still equal to 6. Sure enough, we could add more code to recalculate c1 after the values of both a1 and b1 changed, but a more proper way to handle this is by making c1 to be immediately recalculated as soon as either a1 or b1 change as in Excel spreadsheet. In other words, it would be nice to switch to the push model, where the new and asynchronously changed values are pushed to their consumer. We want to move away from the pull model, where the consumer is constantly asking the producer, “Do you have something new for me?… How about now?… Maybe now you have something?”

Observable, Observer, Subscriber

The main players of any Rx library are Observable, Observer, and Subscriber.

* Observable – an object or a function that emits sequences of data over time (a.k.a. producer)

* Observer – an object or a function that knows how to process the sequences of data (a.k.a. consumer)

* Subscriber – an object or a function that connects an Observable with Observer(s)

After looking at this diagram, many software developers will say, “We already know this. This is a pub-sub messaging with the implementation of the Observer pattern.” To some extent, this is correct, but there’s more to it:

1. Rx is meant for the asynchronous non-blocking data processing.

2. Rx offers a simple API with dedicated channels for sending data, errors, and the end-of-stream signal.

3. Any Rx library has about a hundred operators that can be applied to the data stream en route. Operators are easily composable.

4. Some of the Rx implementations (e.g. RxJava2) support the backpressure well. This is a scenario when a producer emits data faster than a consumer can handle.

5. You don’t need special messaging servers to use a Rx library. Everything you need is a part of your app.

6. In languages that support multi-threading, working with threads as well as switching between the threads is easier.Android developers will appreciate this because the UI rendering has to be done in the main thread while the calculation in others.

So how an Observable sends the data to the Observer? An Observer can implement three methods (their names may slightly vary depending on the language you use):

* next() – here’s a new value from the stream
* error() – here’s an error happened in the stream
* complete() – the stream’s over

In the next code sample, the function getData() turns the array with beers into an Observable and returns it back. Returns to whom? To the subscriber, when some other code invokes subscribe(). A subscriber – getData().subscribe(myObserver) – passes an Observer, as an argument to the function subscribe(). Accordingly, an Observer can implement three functions:

* Handling the next element from the stream
* Handling the stream error
* Handling the end of stream, if needed

// Defining the function that returns an Observable
function getData(){

    var beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

// The observer will be provided at the time of subscription
    return Rx.Observable.create( observer => {

              beers.forEach( beer => observer.next(beer));
              observer.complete();
           }
    );
}

// Calling the function that subscribe to the observable
// The function subscribe() receives the Observer, represented by three functions
getData()
     .subscribe(
         beer  => console.log("Subscriber got " + beer),   // handling the arrived data
         error => console.err(error),                      // an error arrived
            () > console.log("The stream is over")         // the signal that the stream completed arrived
);

Our Observer consists of three fat arrow functions (=>). This syntax was introduced in the ECMAScript 6 spec. Our fat arrow callbacks may be invoked only after we invoked subscribe(). You can see this code sample in action here (open the browser’s console and click on the Run button).

Operators

Operators are functions that can transform the stream data between the moments when the Observable sent them and the function subscribe() received them. In other words, we can transform the data en route. Rx libraries have lots of operators.

Each operator is a function that takes an Observable as an argument, transforms (or ignores) it, and returns another Observable. Since the input and output of any operator have the same type, you can chain them up. Here’s how you can filter out the beer that’s more expensive than 8 dollars and convert the instances of the Beer object into strings.

Studying Rx operators require a time investment, and I’m planning to write more about them. The RX docs often include marble diagrams that help in understanding what a particular operator does. As an example, the marble diagram for the filter operator looks as follows:

On top, the incoming stream (an Observable) is represented by various geometrical shapes. Then the filter operator ignores every element but circles, and the resulting Observables will contain only the circles. When you look at the circle, visualize beers that are cheaper than eight dollars.

Still, how to make c1=a1+b1 reactive?

First, convert a1 and b1 into streams, for example:

const a1 = Rx.Observable.from([2, 55]);

But this stream will shoot 2 and 55 instantaneously, so let’s add the time dimension. To emulate a delay you can use another stream that just emits sequential numbers and join it using the zip operator with the stream that emits 2 and 55:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

When someone subscribes to a1, it’ll emit 2 and in 1.2 seconds 55. Similarly, let’s create a stream for b1 but with a delay of 1.5 seconds. Then, using streams composition and the operator combineLatest, we combine streams a1 and b1 and add their latest values. The entire code will look as follows:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);
  
const b1 = Rx.Observable.from([4, 20])
  .zip(Rx.Observable.interval(1500), x => x);

a1.combineLatest(b1, (x, y) => x + y)
  .subscribe(val => console.log("c1=" + val));

To see this code in action, visit the Plunker at http://bit.ly/2nphn0k, open the browser’s console and click on the button Run. you’ll see how c1 is recalculated as soon as either a1 or b1 is changing.

If you haven’t worked with reactive libraries, take a look at the one available for your programming language and start using it in a real-world project.

A word of caution. Rx libraries allow you to write less code, but the code readability suffer. The person who reads the code needs to know Rx as well.

On the positive side, Rx libraries don’t require you to change the architecture of the entire project. Use them where you can make the async data to flow through a sequence of algorithms (think operators).

Update In the beginning of this post I made a statement that this blog is not a tutorial, but after reading one of the comments, I decided to add a link to the video of my recent presentation in New York at BuzzJS. During the first 25 min of this presentation, I was using just RxJS.

Categories Rx

4 thoughts on “RX: Reactive Libraries

  1. Your central point is invaluable in getting people to grasp the importance of the RX model of programming. But using straight Observables (like those created with the .from operator) might generate more questions than it answers to the newcomer (especially when compounded with .zip). The most direct way to grasp the idea of pushing new values into subscribed functions is with an RX Subject (or more commonly a BehaviorSubject, a hybrid of stream and the traditional variable) – -even if the stream is recast for consumption as a pure Observable (with the .asObservable operator). The concept of pushing each new value into a subscribing function is crystal clear when presented as a call to (e.g.) mySubject.next(newValue) — and this is what’s happening under the hood of all observables anyway. I speak as someone who, like so many, has sweat blood to get my head around RX, and who therefore has strong feeling about getting people started in easiest terms they can understand. RX is the future, like it or not, for the reasons you very properly stress.

    1. I hear you, Rob, but this blog is not a tutorial and explaining what the Subject is would require even more wording than combining with zip. In the initial version of my code sample, I didn’t combine the streams, but I wanted to introduce different delays between emitting the values by a1 and b1 so the three c1 values are printed, and not two. Using BehaviorSubject wouldn’t help me with the time dimension.
      As to Rx being the future, it may happen when this TC39 proposal will become a reality:
      https://tc39.github.io/proposal-observable

      After reading your comment I decided to add a link to my recent RxJS presentation to add a tutorial flavor to it 🙂

  2. Small thing, but if there is an error (introduced for fun) in the first example, get Ungaught TpyeError: console.err() is not a function – the problems of having no static type checking:-)

Leave a comment