RxJS Essentials. Part 3: using Observable.create()

In this article I’ll show you how to create an Observable using the method create() and how an observable can send messages to the observer using its API. The previous articles in this series include:

1. Basic Terms
2. Operators map, filter, and reduce

An observer is an object that implements one or more of these functions: next(), error(), and complete(). Let’s use an object literal to illustrate an observer, but later in this section, we’ll use a simplified syntax with arrow functions:

const beerObserver = {
  next: function(beer) { console.log("Subscriber got " + 
                       beer.name)},
  error: function(err) { console.err(error)},
  complete: function() {console.log("The stream is over")}
}

You can create an observable with the method create() passing an argument that will represent an observer. When observable gets created, it doesn’t know yet which concrete object will be provided. It’ll be known later, at the time of the subscription.

beerObservable = Rx.Observable.create( 
                 observer => observer.next(beer));

This particular observable thinks, “When someone will subscribe to my beers, they will provide me a concrete beer consumer, and I’ll just push one beer object to this guy”.

At the time of subscription, we’ll provide a concrete observer to our observable.

beerObservable.subscribe(beerObserver); 

The observer will get the beer and will print on the console something like this:

Subscriber got Stella

The next listing has a complete script that illustrates the creation of the observer, observable and the subscription. The function getObservableBeer() creates and returns the observable that will iterate through the array of beers and will push each beer to the observer by invoking next(). After that, our observable will invoke complete() on the observer indicating that there won’t be any more beers.

function getObservableBeer(){
        
    return Rx.Observable.create( observer => { // 1

      const beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
      ];

       beers.forEach( beer => observer.next(beer)); // 2

           observer.complete(); // 3
       }
    );
}

getObservableBeer()
     .subscribe(  // 4
         beer => console.log("Subscriber got " + beer.name),
         error => console.err(error),
            () => console.log("The stream is over")
);

1 Create and return the observable object
2 Push each beer to the observer
3 Push the end of stream message to the observer
4 Subscribe to the observable providing the observer object in the form of three fat arrow functions.

The output of this script is shown next:

Subscriber got Stella
Subscriber got Sam Adams
Subscriber got Bud Light
Subscriber got Brooklyn Lager
Subscriber got Sapporo
The stream is over

The RxJS 5 version of this code sample is here. The RxJS 6 version is here.

In our code sample we were invoking next() and complete() on the observer. But keep in mind, that there an observable is just a data pusher, and there is always a data producer (an array of beers in our case) that may generate an error. In this case, we’d invoke observer.error() and the stream completes. There is a way to intercept an error on the subscriber’s side to keep the streaming alive and will discuss it in one of the future articles of this series.

It’s important to note, that our data producer (the array of beers) is created inside the observable getObservableBeer(), which makes it a cold observable. A WebSocket could be another example of the producer. Imagine you have a database of beers on the server, and you can request them over a WebSocket connection (could use HTTP or any other protocol here):

Rx.Observable.create((observer) => {
  const socket = new WebSocket('ws://beers');
  socket.addEventListener('message', (beer) => observer.next(beer));
  return () => socket.close(); // is invoked on unsubscribe()
});

With cold observables each subscriber will get the same beers regardless of the time of the subscription provided that the query criteria (in our case show all beers) are the same.

Many RxJS tutorials explain concepts just using observables that emit numbers, which sometimes complicates the understanding of the concept. Things are different when we use examples from the real life. Everyone understands what the beer is for so the concepts become clear, right? In the next article, you’ll get familiar with the RxJS Subject.

If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.

Advertisement

12 thoughts on “RxJS Essentials. Part 3: using Observable.create()

  1. Yes, it feels great learning it. Guts telling me about underlying mathematical and functional concepts, although I don’t know them.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s