RxJS Essentials. Part 7: Handling errors with the catch operator

In this article, I’ll show you aone of the RxJS operators for error handling – the catch() operator. The previous articles in this series include:

1. Basic Terms
2. Operators map, filter, and reduce
3. Using Observable.create()
4. Using RxJS Subject
5. The flatMap operator
6. The switchMap operator

The Reactive Manifesto declares that a reactive app should be resilient, which means that the app should implement the procedure to keep it alive in case of a failure. An observable can emit an error by invoking the error() function on the observer, but when the error() method is invoked, the stream completes.

A subscription to an observable ends if one of the following occurs:

1. The consumer explicitely unsubscribes
2. The observable invokes the complete() method on the observer
3. The observable invokes the error() method on the observer

RxJS offers several operators to intercept and handle the error before it reaches the code in the error() method on the observer.

* catch(error) – intercepts the error and you can implement some business logic to handle it
* retry(n) – retries the erroneous operation up to n times

* retryWhen(fn) – retries the erroneous operation as per the provided function

In this article, I’ll show you an example of using the catch() operator. Inside the catch() operator you can check the error status and react accordingly. The following code snippet shows how to intercept an error, and if the error status is 500, switch to a different data producer to get the cached data. If the received error status is not 500, this code will return an empty observable and the stream of data will complete. In any case, the method error() on the observer won’t be invoked.

.catch(err => {  
    console.error("Got " + err.status + ": " + err.description);

    if (err.status === 500){
        console.error(">>> Retrieving cached data");

        return getCachedData();  // failover
    } else{
      return Rx.Observable.empty();  // don't handle the error
    }
})

The following listing shows the complete example, where we subscribe to the stream of beers from a primary source – getData() – which randomly generates an error with the status 500. The catch() operator intercepts this error and switches to an alternative source – getCachedData().

function getData(){
    var beers = [
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

    return Rx.Observable.create( observer => {
        let counter = 0;
        beers.forEach( beer => {
                observer.next(beer);   // 1
                counter++;

                if (counter > Math.random()*5) {   // 2
                    observer.error({
                            status: 500,
                            description: "Beer stream error" 
                        });
                } 
            }
        );

        observer.complete();}
    );
}

// Subscribing to data from the primary source
getData() 
    .catch(err => {  // 3
        console.error("Got " + err.status + ": " + err.description);
        if (err.status === 500){
            console.error(">>> Retrieving cached data");
            return getCachedData();   // 4
        } else{
          return Rx.Observable.empty();  // 5  
        }
    })
    .map(beer => beer.name + ", " + beer.country)
    .subscribe(
        beer => console.log("Subscriber got " + beer),
        err => console.error(err),
        () => console.log("The stream is over")
    );

function getCachedData(){  // 6
    var beers = [
        {name: "Leffe Blonde", country: "Belgium", price: 9.50},
        {name: "Miller Lite", country: "USA", price: 8.50},
        {name: "Corona", country: "Mexico", price: 8.00},
        {name: "Asahi", country: "Japan", price: 7.50}
    ];

    return Rx.Observable.create( observer => {
        beers.forEach( beer => {
                observer.next(beer);
            }
        );

        observer.complete();}
    );
}

1. Emit the next beer from the primary data source
2. Randomly generate the error with the status 500
3. Intercept the error before it reaches the observer
4. Failover to the alternative data source
5. Don’t handle the non-500 errors; return an empty observable to complete the stream
6. The alternative data source for failover

The output of this program can look as follows:

Subscriber got Sam Adams, USA
Subscriber got Bud Light, USA
Got 500: Beer stream error
>>> Retrieving cached data
Subscriber got Leffe Blonde, Belgium
Subscriber got Miller Lite, USA
Subscriber got Corona, Mexico
Subscriber got Asahi, Japan
The stream is over

NOTE: To see it in CodePen, follow this link. Stay tuned…

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.

Advertisements

RxJS Essentials. Part 6: The switchMap operator

In this article I’ll introduce the switchMap() operator. The previous articles in this series include:

1. Basic Terms
2. Operators map, filter, and reduce
3. Using Observable.create()
4. Using RxJS Subject
5. The flatMap operator

In the previous article of this series, I demoed the use of flatMap(). While flatMap() unwraps and merges all the values emitted by the outer observable, the switchMap() operator handles the values from the outer observable but cancels the inner subscription being processed if the outer observable emits a new value. The switchMap() operator is easier to explain with the help of its marble diagram shown next.

The outer observable emits the red circle, and switchMap() emits the item from the inner observable (red diamond and square) into the output stream. The red circle was processed without any interruptions because the green circle was emitted after the inner observable finished processing.

The situation is different with the green circle. The switchMap() managed to unwrap and emit the green diamond, but the blue circle arrived before the green square was processed. So the subscription to the green inner observable was cancelled, and the green square was never emitted into the output stream. In other words, the switchMap() operator switched from processing of the green inner observable to the blue one.

The following example has two observables. The outer observable uses the function interval() and emits a sequential number every second. With the help of the take() operator, we limit the emission to two values: 0 and 1. Each of these values is given to the switchMap() operator, and the inner observable emits three numbers with the interval of 400 milliseconds.

let outer$ = Rx.Observable
               .interval(1000)
               .take(2);

let combined$ = outer$.switchMap((x) => {  
     return Rx.Observable
	          .interval(400)
	          .take(3)
	          .map(y => `outer ${x}: inner ${y}`)
});

combined$.subscribe(result => console.log(`${result}`));

The output of this script is shown next:

outer 0: inner 0
outer 0: inner 1
outer 1: inner 0
outer 1: inner 1
outer 1: inner 2

Note that the first inner observable didn’t emit its third value 2. Here’s the timeline:

1. The outer observable emits zero and the inner emits zero 400 milliseconds later
2. In 800 milliseconds later, the inner observable emits 1
3. In 1000 milliseconds the outer observable emits 1, and inner observable was unsubscribed
4. The three inner emissions for the second outer value went uninterrupted because it didn’t emit any new values

If you replace flatMap() with switchMap() the inner observable will emit three values for each outer value as shown below.

outer 0: inner 0
outer 0: inner 1
outer 0: inner 2
outer 1: inner 0
outer 1: inner 1
outer 1: inner 2

NOTE: To see it in CodePen, follow this link.

The chances are slim that you’ll be writing outer and inner observables emitting integers but there are various practical use cases for switchMap(). For example, in my Angular apps (Angular comes with RxJS) I use switchMap() with the HttpClient object (it returns observable) to cancel pending HTTP requests. Just think of a user that types in an HTML input field (the outer observable) and the HTTP requests are being made (inner observable) on each keyup event. The circles on the diagram are the three characters that the user is typing. The inner observables are HTTP requests issued for each character. If the user entered the third character but the HTTP request for the second one is still pending, it’ll get cancelled.

TIP. The function interval() is handy if you want to invoke another function periodically based on the specified time interval. For example, myObservable.interval(1000).subscribe(n => doSometing()) will result in calling the function doSomething() every second.

NOTE: If your code has nested subscribe() calls, this should be a red flag to you. Consider re-writing this code using flatMap(), switchMap(), or concatMap().

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.

In the next article, I’ll show how to intercept errors from an observable stream with the catch(operator.)

RxJS Essentials. Part 5: The flatMap operator

In this article I’ll introduce an RxJS flatMap() operator. Previous articles in this series include:

1. Basic Terms
2. Operators map, filter, and reduce
3. Using Observable.create()
4. Using RxJS Subject

In some cases, you need to treat each item emitted by an observable as another observable. In other words, the outer observable emits the inner observables. Does it mean that we need to write nested subscribe() calls (one for the outer observable and another for the inner one)? No, we don’t. The flatMap() operator takes each item from the outer observable and auto-subscribes to it.

Some operators are not explained well in RxJS documentation, and we recommend you to refer to the general ReaciveX (reactive extensions) documentation for clarification. The flatMap() operator is better explained there, and it states that flatMap() is used to “transform the items emitted by an observable into observables, then flatten the emissions from those into a single observable”. This documentation includes the following marble diagram:

As you see, the flatMap() operator takes an emitted item from the outer observable (the circle) and unwraps its content (the inner observable of diamonds) into the flattened output observable stream. The flatMap() operator merges the emissions of the inner observables so their items may interleave.

The following code listing has an observable that emits drinks, but this time it emits not individual drinks, but palettes. The first palette has beers and the second – soft drinks. Each palette is observable. We want to turn these two palettes into an output observable with individual beverages.

function getDrinks() {

    let beers = Rx.Observable.from([   // 1
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50}
    ], Rx.Scheduler.async);

    let softDrinks = Rx.Observable.from([    // 2
        {name: "Coca Cola", country: "USA", price: 1.50},
        {name: "Fanta", country: "USA", price: 1.50},
        {name: "Lemonade", country: "France", price: 2.50}
    ], Rx.Scheduler.async);

    return Rx.Observable.create( observer => {
            observer.next(beers);     // 3
            observer.next(softDrinks);   // 4
            observer.complete();
        }
    );
}

// We want to "unload" each palette and print each drink info

getDrinks()
  .flatMap(drinks => drinks)    // 5   
  .subscribe(  // 6
      drink => console.log("Subscriber got " + drink.name + ": " + drink.price ),
      error => console.err(error),
      () => console.log("The stream of drinks is over")
  );

1. Creating an async observable from beers
2. Creating an async observable from soft drinks
3. Emitting the beers observable with next()
4. Emitting the soft drinks observable with next()
5. Unloading drinks from pallets into a merged observable
6. Subscribing to the merged observable

This script will produce the output that may look as follows (note that the drinks interleave):

Subscriber got Stella: 9.5
Subscriber got Coca Cola: 1.5
Subscriber got Sam Adams: 8.5
Subscriber got Fanta: 1.5
Subscriber got Bud Light: 6.5
Subscriber got Lemonade: 2.5
The stream of observables is over

To see it in CodePen visit this link.

Are there any other uses of the flatMap() operator besides unloading palettes of drinks? Another scenario where you’d want to use flatMap() is when you need to execute more than one HTTP request, where the result of the first request should be given to the second one. In Angular, HTTP requests return observables and without flatMap() this could be done (it a bad style) with nested subscribe() calls:

this.httpClient.get('/customers/123')
  .subscribe(customer => {
              this.httpClient.get(customer.orderUrl)
              .subscribe(response => this.order = response)
  })

The method httpClient.get() returns an observable, and the better way to write the above code is by using the flatMap() operator, which auto-subscribes and unwraps the content of the first observable and makes another HTTP request:

httpClient.get('./customers/123')
          .flatMap(customer => this.httpClient.get(customer.orderURL))
          .subscribe(response => this.order = response);

Since a flatMap() is a special case of map(), you can specify a transforming function while flattening the observables into a common stream. In the above example, we transform the value customer into a function call httpClient.get().

TIP: In RxJS, flatMap() is an alias of mergeMap() so these two operators have the same functionality.

Let’s consider one more example of using flatMap(). This example will be a modified version of the traders-orders example used in the article “Using RxJS Subject“. This example is written in TypeScript and it uses two Subject instances:

* traders – this Subject keeps track of traders
* orders – this Subject is declared inside the class Trader and keeps track of each order placed by a particular trader.

You’re the manager who wants to monitor all orders placed by all traders. Without flatMap(), you’d need to subscribe to traders (the outer observable) and create a nested subscription for orders (the inner observable) that each subject has. Using flatMap() allows you to write just one subscribe() call, which will be receiving the inner observables from each trader in one stream.

import {Subject} from 'rxjs/Subject';
import 'rxjs/add/operator/mergeMap';

enum Action{
    Buy = 'BUY',
    Sell = 'SELL'
}

class Order{
    constructor(public orderId: number, public traderId: number, public stock: string, public shares: number, public action:Action){}
}

let traders = new Subject<Trader>();  // 1

class Trader {

    orders = new Subject<Order>();   // 2

    constructor(private traderId:number, public traderName:string){}
}

let tradersSubscriber = traders.subscribe(trader => console.log(`Trader ${trader.traderName} arrived`))

let ordersSubscriber = traders        // 3
  .flatMap(trader => trader.orders)   // 4
  .subscribe(ord =>      // 5
       console.log(`Got order from trader ${ord.traderId} to ${ord.action} ${ord.shares} shares of ${ord.stock}`));

let firstTrader = new Trader(1, 'Joe');
let secondTrader = new Trader(2, 'Mary');

traders.next(firstTrader);
traders.next(secondTrader);

let order1 = new Order(1, 1,'IBM',100,Action.Buy);
let order2 = new Order(2, 1,'AAPL',200,Action.Sell);
let order3 = new Order(3, 2,'MSFT',500,Action.Buy);

// Traders place orders
firstTrader.orders.next( order1);
firstTrader.orders.next( order2);
secondTrader.orders.next( order3);

1. Declare the Subject for traders
2. Each trader has its own Subject for orders
3. Starting with the outer observable traders
4. Extracting the inner observable from each Trader instance
5. The function subscribe() receives a stream of orders

In this version of the program, the class Trader doesn’t have a method placeOrder(). We just have the trader’s observable orders push the order to its observer by using the method next(). Remember, a Subject has both observable and observer.

The output of this program is shown next.

Trader Joe arrived
Trader Mary arrived
Got order from trader 1 to BUY 100 shares of IBM
Got order from trader 1 to SELL 200 shares of AAPL
Got order from trader 2 to BUY 500 shares of MSFT

In our example, the subscriber just prints the orders on the console, but in a real world app it could invoke another function that would be placing orders with the stock exchange for execution.

To see it in CodePen, follow this link. In the next article you’ll learn about a very useful operator switchMap().

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.

RxJS essentials. Part 4: Using Subject

In this article I’ll introduce an RxJS Subject. The previous articles in this series include:

1. Basic Terms
2. Operators map, filter, and reduce
3. Using Observable.create()

A RxJS Subject is an object that contains the observable and observer(s). This means that you can push the data to its observer(s) using next() as well as subscribe to it. A Subject can have multiple observers, which makes it useful when you need to implement for multi-casting – emit a value to multiple subscribers.

Say, you have an instance of a Subject and two subscribers. If you push a value to the subject, each subscriber will receive it.

const mySubject = new Subject();

const subscription1 = mySubject.subscribe(...);

const subscription2 = mySubject.subscribe(...);

...

mySubject.next(123); // each subscriber gets 123

The following example has one Subject with two subscribers. The first value is emitted to both subscribers, and then one of them unsubscribes. The second value is emitted to one active subscriber.

const mySubject = new Subject();

const subscriber1 = mySubject
    .subscribe( x => console.log(`Subscriber 1 got ${x}`) ); // <1>

const subscriber2 = mySubject
    .subscribe( x => console.log(`Subscriber 2 got ${x}`) ); // <2>

mySubject.next(123);  // <3>

subscriber2.unsubscribe();  // <4>

mySubject.next(567);  // <5>

1. Create the first subscriber
2. Create the second subscriber
3. Push the value of 123 to subscribers (we have two of them)
4. Unsubscribe the second subscriber
5. Push the value of 567 to subscribers (we have just one now)

Running this script produces the following output on the console:

Subscriber 1 got 123
Subscriber 2 got 123
Subscriber 1 got 567

To see it in CodePen, visit the following link: https://codepen.io/yfain/pen/JyxvyK?editors=1011

Now let’s consider a more practical example. A financial firm has traders who can place orders to buy or sell stocks. Whenever the trader places an order, it has to be given to two scripts (subscribers):

1. The script that knows how to place orders with a stock exchange.
2. The script that knows how to report each order to a trade commission that keeps track of all trading activities.

The following code sample shows how to ensure that both of the above subscribers can receive the orders as soon as a trader places them. We create an instance of the Subject called orders, and whenever we invoke next() on it, both subscribers will receive the order. I’ll write this code sample in TypeScript because using types make the code easier to read/write (at least for me), but if you want to see its JavaScript version, copy/paste the code to the TypeScript playground at http://www.typescriptlang.org/play and you’ll see the ES5 version on the right.

import {Subject} from 'rxjs/Subject';

enum Action{      // <1>
    Buy = 'BUY',
    Sell = 'SELL'
}

class Order{   // <2>
    constructor(public orderId: number, public traderId: number, public stock: string, public shares: number, public action:Action){}
}

let orders = new Subject<Order>();  // <3>

class Trader {   // <4>

    constructor(private traderId:number, private traderName:string){}

    placeOrder(order: Order){
        orders.next(order);   // <5>
    }
}

let stockExchange = orders.subscribe(   // <6>
    ord => console.log(`Sending to stock exchange the order to ${ord.action} ${ord.shares} shares of ${ord.stock}`)); 
let tradeCommission = orders.subscribe(  // <7>
    ord => console.log(`Reporting to trade commission the order to ${ord.action} ${ord.shares} shares of ${ord.stock}`));

let trader = new Trader(1, 'Joe'); 
let order1 = new Order(1, 1,'IBM',100,Action.Buy);
let order2 = new Order(2, 1,'AAPL',100,Action.Sell);

trader.placeOrder( order1);   // <8>
trader.placeOrder( order2);   // <9>

1. Use enums to declare which actions are allowed for orders
2. A class representing an order
3. A subject instance that works only with the Order objects
4. A class representing a trader
5. When an order is placed, we push it to subscribers
6. A stock exchange subscriber
7. A trade commission subscriber
8. Placing the first order
9. Placing the second order

Running the above script produces the following output:

Sending to stock exchange the order to BUY 100 shares of IBM
Reporting to trade commission the order to BUY 100 shares of IBM
Sending to stock exchange the order to SELL 100 shares of AAPL
Reporting to trade commission the order to SELL 100 shares of AAPL

To see it in CodePen follow this link: https://codepen.io/yfain/pen/wqNOeg?editors=1011

In this example, we use TypeScript enums that allow defining a limited number of constants. Placing the actions to buy or sell inside an enum provides additional type checking to ensure that our script uses only the allowed actions. If we’d just use the string constants like “SELL” or “BUY”, the developer could misspell a word (e.g. “BYE”) while creating an order. By declaring enum Action we restrict possible actions to Action.Buy or Action.Sell. Trying to use Action.Bye results in a compilation error. BTW, did you know that RxJS 5 was written in TypeScript?
In the next article of this series, we’ll get familiar with the flatMap() operator.

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.

My books

Books that I authored or co-authored

1. Angular Development with TypeScript, Second Edition, Manning Publications, 2018
2. Angular 2 Development with TypeScript, Manning Publications, 2016
3. Java Programming, 24-Hour Training, 2nd edition, Wiley, 2015
4. Java Programming for Kids, Self-Published e-book, 2015, Free Download
5. Enterprise Web Development: From Desktop to Mobile“, O’Reilly, 2014.
6. Java Programming. 24-Hour Training, 1st edition, Wiley, 2011
7. Enterprise Development with Flex, O’Reilly, 2010
8. Enterprise Development Without the BS, Self-Published, 2008, Free Download”
8. Rich Internet Applications with Flex and Java, Sys-Con Books, 2007
10. Java 2. Enterprise Edition 1.4 Bible, Wiley, 2003
11. Java Tutorial for the Real World Self-Published, 2002

Back to blog

RxJS Essentials. Part 3: using Observable.create()

In this article I’ll show you how to create an Observable using the method create() and how an observable can send messages to the observer using its API. The previous articles in this series include:

1. Basic Terms
2. Operators map, filter, and reduce

An observer is an object that implements one or more of these functions: next(), error(), and complete(). Let’s use an object literal to illustrate an observer, but later in this section, we’ll use a simplified syntax with arrow functions:

const beerObserver = {
  next: function(beer) { console.log("Subscriber got " + 
                       beer.name)},
  error: function(err) { console.err(error)},
  complete: function() {console.log("The stream is over")}
}

You can create an observable with the method create() passing an argument that will represent an observer. When observable gets created, it doesn’t know yet which concrete object will be provided. It’ll be known later, at the time of the subscription.

beerObservable = Rx.Observable.create( 
                 observer => observer.next(beer));

This particular observable thinks, “When someone will subscribe to my beers, they will provide me a concrete beer consumer, and I’ll just push one beer object to this guy”.

At the time of subscription, we’ll provide a concrete observer to our observable.

beerObservable.subscribe(beerObserver); 

The observer will get the beer and will print on the console something like this:

Subscriber got Stella

The next listing has a complete script that illustrates the creation of the observer, observable and the subscription. The function getObservableBeer() creates and returns the observable that will iterate through the array of beers and will push each beer to the observer by invoking next(). After that, our observable will invoke complete() on the observer indicating that there won’t be any more beers.

function getObservableBeer(){
        
    return Rx.Observable.create( observer => { // 1

      const beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
      ];

       beers.forEach( beer => observer.next(beer)); // 2

           observer.complete(); // 3
       }
    );
}

getObservableBeer()
     .subscribe(  // 4
         beer => console.log("Subscriber got " + beer.name),
         error => console.err(error),
            () => console.log("The stream is over")
);

1 Create and return the observable object
2 Push each beer to the observer
3 Push the end of stream message to the observer
4 Subscribe to the observable providing the observer object in the form of three fat arrow functions.

The output of this script is shown next:

Subscriber got Stella
Subscriber got Sam Adams
Subscriber got Bud Light
Subscriber got Brooklyn Lager
Subscriber got Sapporo
The stream is over

To see it in CodePen visit this link.

In our code sample we were invoking next() and complete() on the observer. But keep in mind, that there an observable is just a data pusher, and there is always a data producer (an array of beers in our case) that may generate an error. In this case, we’d invoke observer.error() and the stream completes. There is a way to intercept an error on the subscriber’s side to keep the streaming alive and will discuss it in one of the future articles of this series.

It’s important to note, that our data producer (the array of beers) is created inside the observable getObservableBeer(), which makes it a cold observable. A WebSocket could be another example of the producer. Imagine you have a database of beers on the server, and you can request them over a WebSocket connection (could use HTTP or any other protocol here):

Rx.Observable.create((observer) => {
  const socket = new WebSocket('ws://beers');
  socket.addEventListener('message', (beer) => observer.next(beer));
  return () => socket.close(); // is invoked on unsubscribe()
});

With cold observables each subscriber will get the same beers regardless of the time of the subscription provided that the query criteria (in our case show all beers) are the same.

Many RxJS tutorials explain concepts just using observables that emit numbers, which sometimes complicates the understanding of the concept. Things are different when we use examples from the real life. Everyone understands what the beer is for so the concepts become clear, right? In the next article, you’ll get familiar with the RxJS Subject.

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.

RxJS Essentials. Part 2: map, filter, reduce

In the previous article, I introduced the main players or RxJS, and now let’s start getting familiar with RxJS operators. In this article well use the operators map(), filter(), and reduce().

As the data elements flow from the observable to the observer, you can apply one or more operators, transforming each element prior to supplying it to the observer. Each operator takes an observable as an input, performs its action, and returns a new observable as an output:

Since each operator takes an observable in and creates an observable as its output, operators can be chained so each observable element can go through several transformations prior to being handed to the observer.

RxJS offers about a hundred of various operators, and their documentation may not always be easy to understand. On the positive side, the documentation often illustrates operators with so called marble diagrams. You can get familiar with the syntax of marble diagrams here.

The map() operator

The map() operator transforms one value to another. It takes a given value from the observable stream and applies the provided transforming function to it.

The marble diagram below is taken from the RxJS manual, and it illustrates the map() operator. This diagram shows the map() operator that takes a value of each incoming element and multiplies it by 10. The fat arrow function x => x*10 is the transforming function in this example.

On top, a marble diagram shows the horizontal line with several shapes representing a stream of incoming observable elements. Then goes the illustration of what a particular operator does. At the bottom, you see another horizontal line depicting the outgoing observable stream after the operator has been applied. The vertical bar represents the end of the stream. When you look at the diagram, think of a time that’s moving from left to right. First, 1 was emitted, the time went by… and 2 was emitted, the time went by… and 3 was emitted, and then the stream ended.

The filter() operator

Now let’s get familiar with the marble diagram of the filter() operator shown next.

The filter() operator takes a function predicate as an argument, which returns true if the emitted value meets the criteria, or false otherwise. Only the values that meet the criteria will make it to the observer. This particular diagram uses the fat arrow function that checks if the current element is an odd number. The even numbers won’t make it further down the chain to the observer.

The following code sample declares an array of beers and turns it into Observable using the Observable.from() function. Then it applies the filter() operator to pass only the beers that cost less than eight dollars, and the chained map() operator converts the beer object into a string.

let beers = [
    {name: "Stella", country: "Belgium", price: 9.50},
    {name: "Sam Adams", country: "USA", price: 8.50},
    {name: "Bud Light", country: "USA", price: 6.50},
    {name: "Brooklyn Lager", country: "USA", price: 8.00},
    {name: "Sapporo", country: "Japan", price: 7.50}
];

Rx.Observable.from(beers)   // <1>
  .filter(beer => beer.price < 8)   // <2>
  .map(beer => beer.name + ": $" + beer.price) // <3>
  .subscribe(    // <4>
      beer => console.log(beer),
      err => console.error(err),
      () => console.log("Streaming is over")
);

console.log("This is the last line of the script");

1. Turn the beers array into an Observable

2. Filter out the beers that are more expensive than eight dollars

3. Turn the object into a string showing the beer name the price

4. Subscribe to the Observable providing the Observer as three fat arrow functions

This program will print the following on the console:

Bud Light: $6.5
Sapporo: $7.5
Streaming is over
This is the last line of the script

To see it in action in CodePen, follow this link.

By default, the operator from() returns a synchronous observable, but if you want an asynchronous one, use a second argument specifying an async scheduler:

Rx.Observable.from(beers, Rx.Scheduler.async)

Making this change in the above code sample will print “This is the last line of the script” first and then will emit beers info.

Don’t include the entire RxJS library in your app

When you’ll be reviewing the code in CodePen, not that the HTML document imports the entire RxJS library as follows:

<script 
 src="https://unpkg.com/@reactivex/rxjs/dist/global/Rx.js">
</script>

This loads the entire RxJS library even though we only use the Observable, from(), filter(), and map(). Loading the entire library for demo purposes is OK, but in the real-world apps, we want to minimize the app size and load only the code we actually use. For example, in JavaScript or TypeScript apps we’d install RxJS locally and write something like this:

import 'rxjs/add/operator/map';
import {Observable} from "rxjs/Observable";

Including the entire RxJS add about 90KB (33KB gzipped) to your app that uses RxJS. The size of the future version RxJS 6 will be substantially smaller, but we recommend including only the classes and functions that you use for now.

NOTE. Starting from RxJS 5.5 you can use so-called lettable operators, which simplify import statements, e.g.

import { debounceTime, map} from 'rxjs/operators';

Using this syntax also simplifies the tree-shaking process performed during app bundling with the tools like Webpack or Rollup.

The reduce() operator

Now we’d like to introduce the operator reduce() that allows you aggregate values emitted by an observable. The marble diagram of the reduce() operator is shown next. This diagram shows an observable that emitted 1, 3, and 5, and the reduce() operator added them up producing the accumulated value of 9.

The operator reduce() has two arguments: an accumulator function where you specify how to aggregate the values, and the initial (seed) value to be used by the accumulator function. The above diagram uses zero was as an initial value, but if we’d change it to 10, the accumulated value would be 19.

As you see from the above diagram, the accumulator function also has two arguments:

* acc stores the currently accumulated value, which is available for each emitted element

* curr stores the currently emitted value.

The following code sample creates an observable fro the beers array and applies two operators to each emitted element: map() and reduce(). The map() operator takes a beer object and extracts its price, and the reduce() operator adds the prices.

let beers = [
    {name: "Stella", country: "Belgium", price: 9.50},
    {name: "Sam Adams", country: "USA", price: 8.50},
    {name: "Bud Light", country: "USA", price: 6.50},
    {name: "Brooklyn Lager", country: "USA", price: 8.00},
    {name: "Sapporo", country: "Japan", price: 7.50}
];

Rx.Observable.from(beers) 
    .map(beer =>  beer.price)  // <1>
    .reduce( (total,price) => total + price, 0)  // <2>
    .subscribe(
        totalPrice => console.log(`Total price: ${totalPrice}`)  // <3>
);

1. Transform the beer object into its price
2. Sum up the prices
3. Print the total price of all beers

Running this script will produce the following output:

Total price: 40

In this script, we were adding all prices, but you could apply any other calculation to create a different aggregate value, e.g. to calculate an average or maximum price.

The reduce() operator emits the aggregated result when the observable completes. In the above example, it happened naturally, because we created an observable from an array with a finite number of elements. In other scenarios, you’d need to invoke the method complete() on the observer explicitly, and you’ll see how to do it in the next article of this series.

To see this example running in CodePen follow this link.

Code samples from this article were turning the array into an observable and were magically pushing the array’s elements to the observer. In the next article, I’ll show you how to push elements by invoking the next() function on the observer.

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.