RxJS Essentials. Part 2: map, filter, reduce

In the previous article, I introduced the main players or RxJS, and now let’s start getting familiar with RxJS operators. In this article well use the operators map(), filter(), and reduce().

As the data elements flow from the observable to the observer, you can apply one or more operators, transforming each element prior to supplying it to the observer. Each operator takes an observable as an input, performs its action, and returns a new observable as an output:

Since each operator takes an observable in and creates an observable as its output, operators can be chained so each observable element can go through several transformations prior to being handed to the observer.

RxJS offers about a hundred of various operators, and their documentation may not always be easy to understand. On the positive side, the documentation often illustrates operators with so called marble diagrams.

The map() operator

The map() operator transforms one value to another. It takes a given value from the observable stream and applies the provided transforming function to it.

The marble diagram below is taken from the RxJS manual, and it illustrates the map() operator. This diagram shows the map() operator that takes a value of each incoming element and multiplies it by 10. The fat arrow function x => x*10 is the transforming function in this example.

On top, a marble diagram shows the horizontal line with several shapes representing a stream of incoming observable elements. Then goes the illustration of what a particular operator does. At the bottom, you see another horizontal line depicting the outgoing observable stream after the operator has been applied. The vertical bar represents the end of the stream. When you look at the diagram, think of a time that’s moving from left to right. First, 1 was emitted, the time went by… and 2 was emitted, the time went by… and 3 was emitted, and then the stream ended.

The filter() operator

Now let’s get familiar with the marble diagram of the filter() operator shown next.

The filter() operator takes a function predicate as an argument, which returns true if the emitted value meets the criteria, or false otherwise. Only the values that meet the criteria will make it to the observer. This particular diagram uses the fat arrow function that checks if the current element is an odd number. The even numbers won’t make it further down the chain to the observer.

The following code sample declares an array of beers and turns it into Observable using the Observable.from() function. Then it applies the filter() operator to pass only the beers that cost less than eight dollars, and the chained map() operator converts the beer object into a string.

let beers = [
    {name: "Stella", country: "Belgium", price: 9.50},
    {name: "Sam Adams", country: "USA", price: 8.50},
    {name: "Bud Light", country: "USA", price: 6.50},
    {name: "Brooklyn Lager", country: "USA", price: 8.00},
    {name: "Sapporo", country: "Japan", price: 7.50}
];

Rx.Observable.from(beers)   // <1>
  .filter(beer => beer.price < 8)   // <2>
  .map(beer => beer.name + ": $" + beer.price) // <3>
  .subscribe(    // <4>
      beer => console.log(beer),
      err => console.error(err),
      () => console.log("Streaming is over")
);

console.log("This is the last line of the script");

1. Turn the beers array into an Observable

2. Filter out the beers that are more expensive than eight dollars

3. Turn the object into a string showing the beer name the price

4. Subscribe to the Observable providing the Observer as three fat arrow functions

This program will print the following on the console:

Bud Light: $6.5
Sapporo: $7.5
Streaming is over
This is the last line of the script

To see it in action in CodePen, follow this link.

By default, the operator from() returns a synchronous observable, but if you want an asynchronous one, use a second argument specifying an async scheduler:

Rx.Observable.from(beers, Rx.Scheduler.async)

Making this change in the above code sample will print “This is the last line of the script” first and then will emit beers info.

Don’t include the entire RxJS library in your app

When you’ll be reviewing the code in CodePen, not that the HTML document imports the entire RxJS library as follows:

<script 
 src="https://unpkg.com/@reactivex/rxjs/dist/global/Rx.js">
</script>

This loads the entire RxJS library even though we only use the Observable, from(), filter(), and map(). Loading the entire library for demo purposes is OK, but in the real-world apps, we want to minimize the app size and load only the code we actually use. For example, in JavaScript or TypeScript apps we’d install RxJS locally and write something like this:

import 'rxjs/add/operator/map';
import {Observable} from "rxjs/Observable";

Including the entire RxJS add about 90KB (33KB gzipped) to your Angular app. The size of the future version RxJS 6 will be substantially smaller, but we recommend including only the classes and functions that you use for now.

The reduce() operator

Now we’d like to introduce the operator reduce() that allows you aggregate values emitted by an observable. The marble diagram of the reduce() operator is shown next. This diagram shows an observable that emitted 1, 3, and 5, and the reduce() operator added them up producing the accumulated value of 9.

The operator reduce() has two arguments: an accumulator function where you specify how to aggregate the values, and the initial (seed) value to be used by the accumulator function. The above diagram uses zero was as an initial value, but if we’d change it to 10, the accumulated value would be 19.

As you see from the above diagram, the accumulator function also has two arguments:

* acc stores the currently accumulated value, which is available for each emitted element

* curr stores the currently emitted value.

The following code sample creates an observable fro the beers array and applies two operators to each emitted element: map() and reduce(). The map() operator takes a beer object and extracts its price, and the reduce() operator adds the prices.

let beers = [
    {name: "Stella", country: "Belgium", price: 9.50},
    {name: "Sam Adams", country: "USA", price: 8.50},
    {name: "Bud Light", country: "USA", price: 6.50},
    {name: "Brooklyn Lager", country: "USA", price: 8.00},
    {name: "Sapporo", country: "Japan", price: 7.50}
];

Rx.Observable.from(beers) 
    .map(beer =>  beer.price)  // <1>
    .reduce( (total,price) => total + price, 0)  // <2>
    .subscribe(
        totalPrice => console.log(`Total price: ${totalPrice}`)  // <3>
);

1. Transform the beer object into its price
2. Sum up the prices
3. Print the total price of all beers

Running this script will produce the following output:

Total price: 40

In this script, we were adding all prices, but you could apply any other calculation to create a different aggregate value, e.g. to calculate an average or maximum price.

The reduce() operator emits the aggregated result when the observable completes. In the above example, it happened naturally, because we created an observable from an array with a finite number of elements. In other scenarios, you’d need to invoke the method complete() on the observer explicitly, and you’ll see how to do it in the next article of this series.

To see this example running in CodePen follow this link.

Code samples from this article were turning the array into an observable and were magically pushing the array’s elements to the observer. In the next article, I’ll show you how to push elements by invoking the next() function on the observer.

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.

Advertisements

RX: Reactive Libraries

Over the last several years, the term “reactive programming” became popular in many programming languages. Reactive Manifesto was published although it gives a rather generic definition of reactive systems http://www.reactivemanifesto.org. Yes, an app should response fast (Responsive), remain functional in case of errors (Resilient), be flexible in regards to increasing/decreasing computational resources (Elastic), and be based on asynchronous events (Message Driven).

Declaring the principles is a good start, but how to apply these principles in a concrete app? Let’a talk about reactive extensions libraries available for many programming languages.

About seven years ago, Erik Meijer from Microsoft created Reactive Extensions (Rx) – a set of libraries Rx.NET for processing asynchronous event-driven data streams. For example, someone posted a tweet and you received an immediate notification.

A non-reactive way of receiving tweets would be visiting twitter.com every now and then and reload the page hoping that one of the people you follow posted a new tweet – this is known as polling. The load on the server will substantially increase if every user will keep polling the server. The push model is a lot more efficient – just subscribe to the tweeter feed and get the tweets asynchronously pushed to you as they become available.

An online auction is another use case for the async data push. A user bids on the product, but other users may overbid her. Bids should be implemented as a stream that allows subscription so the users won’t need to constantly check if their offers are still the winning ones.

Another example is a stream on stock prices during the working ours of a stock exchange. Or take a stream of signals from a sensor (e.g. an accelerometer in your phone). Even the process of dragging the mouse over the screen can be treated as a stream of coordinates of the mouse pointer.

Five years ago, Microsoft released Rx.NET as an open source project. People liked it and the library was ported to other programming languages: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.

Disclaimer. This post is not a Rx tutorial, but a brief introduction of the main Rx players. I work with RxJS and RxJava, but in this post, I’ll be using code samples in JavaScript

Let’s get familiar with the main concepts of Rx libraries, but first consider this non-reactive code:

int a1 = 2;

int b1 = 4;



int c1 = a1 + b1;  // c1 = 6

    

a1 = 55;           // c1 = 6, but should be 59    
b1 = 20;           // c1 = 6, but should be 75

After the execution of this code, c1 is still equal to 6. Sure enough, we could add more code to recalculate c1 after the values of both a1 and b1 changed, but a more proper way to handle this is by making c1 to be immediately recalculated as soon as either a1 or b1 change as in Excel spreadsheet. In other words, it would be nice to switch to the push model, where the new and asynchronously changed values are pushed to their consumer. We want to move away from the pull model, where the consumer is constantly asking the producer, “Do you have something new for me?… How about now?… Maybe now you have something?”

Observable, Observer, Subscriber

The main players of any Rx library are Observable, Observer, and Subscriber.

* Observable – an object or a function that emits sequences of data over time (a.k.a. producer)

* Observer – an object or a function that knows how to process the sequences of data (a.k.a. consumer)

* Subscriber – an object or a function that connects an Observable with Observer(s)

After looking at this diagram, many software developers will say, “We already know this. This is a pub-sub messaging with the implementation of the Observer pattern.” To some extent, this is correct, but there’s more to it:

1. Rx is meant for the asynchronous non-blocking data processing.

2. Rx offers a simple API with dedicated channels for sending data, errors, and the end-of-stream signal.

3. Any Rx library has about a hundred operators that can be applied to the data stream en route. Operators are easily composable.

4. Some of the Rx implementations (e.g. RxJava2) support the backpressure well. This is a scenario when a producer emits data faster than a consumer can handle.

5. You don’t need special messaging servers to use a Rx library. Everything you need is a part of your app.

6. In languages that support multi-threading, working with threads as well as switching between the threads is easier.Android developers will appreciate this because the UI rendering has to be done in the main thread while the calculation in others.

So how an Observable sends the data to the Observer? An Observer can implement three methods (their names may slightly vary depending on the language you use):

* next() – here’s a new value from the stream
* error() – here’s an error happened in the stream
* complete() – the stream’s over

In the next code sample, the function getData() turns the array with beers into an Observable and returns it back. Returns to whom? To the subscriber, when some other code invokes subscribe(). A subscriber – getData().subscribe(myObserver) – passes an Observer, as an argument to the function subscribe(). Accordingly, an Observer can implement three functions:

* Handling the next element from the stream
* Handling the stream error
* Handling the end of stream, if needed

// Defining the function that returns an Observable
function getData(){

    var beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

// The observer will be provided at the time of subscription
    return Rx.Observable.create( observer => {

              beers.forEach( beer => observer.next(beer));
              observer.complete();
           }
    );
}

// Calling the function that subscribe to the observable
// The function subscribe() receives the Observer, represented by three functions
getData()
     .subscribe(
         beer  => console.log("Subscriber got " + beer),   // handling the arrived data
         error => console.err(error),                      // an error arrived
            () > console.log("The stream is over")         // the signal that the stream completed arrived
);

Our Observer consists of three fat arrow functions (=>). This syntax was introduced in the ECMAScript 6 spec. Our fat arrow callbacks may be invoked only after we invoked subscribe(). You can see this code sample in action here (open the browser’s console and click on the Run button).

Operators

Operators are functions that can transform the stream data between the moments when the Observable sent them and the function subscribe() received them. In other words, we can transform the data en route. Rx libraries have lots of operators.

Each operator is a function that takes an Observable as an argument, transforms (or ignores) it, and returns another Observable. Since the input and output of any operator have the same type, you can chain them up. Here’s how you can filter out the beer that’s more expensive than 8 dollars and convert the instances of the Beer object into strings.

Studying Rx operators require a time investment, and I’m planning to write more about them. The RX docs often include marble diagrams that help in understanding what a particular operator does. As an example, the marble diagram for the filter operator looks as follows:

On top, the incoming stream (an Observable) is represented by various geometrical shapes. Then the filter operator ignores every element but circles, and the resulting Observables will contain only the circles. When you look at the circle, visualize beers that are cheaper than eight dollars.

Still, how to make c1=a1+b1 reactive?

First, convert a1 and b1 into streams, for example:

const a1 = Rx.Observable.from([2, 55]);

But this stream will shoot 2 and 55 instantaneously, so let’s add the time dimension. To emulate a delay you can use another stream that just emits sequential numbers and join it using the zip operator with the stream that emits 2 and 55:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

When someone subscribes to a1, it’ll emit 2 and in 1.2 seconds 55. Similarly, let’s create a stream for b1 but with a delay of 1.5 seconds. Then, using streams composition and the operator combineLatest, we combine streams a1 and b1 and add their latest values. The entire code will look as follows:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);
  
const b1 = Rx.Observable.from([4, 20])
  .zip(Rx.Observable.interval(1500), x => x);

a1.combineLatest(b1, (x, y) => x + y)
  .subscribe(val => console.log("c1=" + val));

To see this code in action, visit the Plunker at http://bit.ly/2nphn0k, open the browser’s console and click on the button Run. you’ll see how c1 is recalculated as soon as either a1 or b1 is changing.

If you haven’t worked with reactive libraries, take a look at the one available for your programming language and start using it in a real-world project.

A word of caution. Rx libraries allow you to write less code, but the code readability suffer. The person who reads the code needs to know Rx as well.

On the positive side, Rx libraries don’t require you to change the architecture of the entire project. Use them where you can make the async data to flow through a sequence of algorithms (think operators).

Update In the beginning of this post I made a statement that this blog is not a tutorial, but after reading one of the comments, I decided to add a link to the video of my recent presentation in New York at BuzzJS. During the first 25 min of this presentation, I was using just RxJS.

Categories Rx