In the previous article, I introduced the main players or RxJS, and now let’s start getting familiar with RxJS operators. In this article well use the operators map(), filter(), and reduce().
As the data elements flow from the observable to the observer, you can apply one or more operators, transforming each element prior to supplying it to the observer. Each operator takes an observable as an input, performs its action, and returns a new observable as an output:

Since each operator takes an observable in and creates an observable as its output, operators can be chained so each observable element can go through several transformations prior to being handed to the observer.
RxJS offers about a hundred of various operators, and their documentation may not always be easy to understand. On the positive side, the documentation often illustrates operators with so called marble diagrams. You can get familiar with the syntax of marble diagrams here.
The map() operator
The map() operator transforms one value to another. It takes a given value from the observable stream and applies the provided transforming function to it.
The marble diagram below is taken from the RxJS manual, and it illustrates the map() operator. This diagram shows the map() operator that takes a value of each incoming element and multiplies it by 10. The fat arrow function x => x*10 is the transforming function in this example.

On top, a marble diagram shows the horizontal line with several shapes representing a stream of incoming observable elements. Then goes the illustration of what a particular operator does. At the bottom, you see another horizontal line depicting the outgoing observable stream after the operator has been applied. The vertical bar represents the end of the stream. When you look at the diagram, think of a time that’s moving from left to right. First, 1 was emitted, the time went by… and 2 was emitted, the time went by… and 3 was emitted, and then the stream ended.
The filter() operator
Now let’s get familiar with the marble diagram of the filter() operator shown next.

The filter() operator takes a function predicate as an argument, which returns true if the emitted value meets the criteria, or false otherwise. Only the values that meet the criteria will make it to the observer. This particular diagram uses the fat arrow function that checks if the current element is an odd number. The even numbers won’t make it further down the chain to the observer.
The following code sample declares an array of beers and turns it into Observable using the Observable.from() function. Then it applies the filter() operator to pass only the beers that cost less than eight dollars, and the chained map() operator converts the beer object into a string.
let beers = [
{name: "Stella", country: "Belgium", price: 9.50},
{name: "Sam Adams", country: "USA", price: 8.50},
{name: "Bud Light", country: "USA", price: 6.50},
{name: "Brooklyn Lager", country: "USA", price: 8.00},
{name: "Sapporo", country: "Japan", price: 7.50}
];
Rx.Observable.from(beers) // <1>
.filter(beer => beer.price < 8) // <2>
.map(beer => beer.name + ": $" + beer.price) // <3>
.subscribe( // <4>
beer => console.log(beer),
err => console.error(err),
() => console.log("Streaming is over")
);
console.log("This is the last line of the script");
1. Turn the beers array into an Observable
2. Filter out the beers that are more expensive than eight dollars
3. Turn the object into a string showing the beer name the price
4. Subscribe to the Observable providing the Observer as three fat arrow functions
This program will print the following on the console:
Bud Light: $6.5
Sapporo: $7.5
Streaming is over
This is the last line of the script
To see it in action in CodePen, follow this link.
By default, the operator from() returns a synchronous observable, but if you want an asynchronous one, use a second argument specifying an async scheduler:
Rx.Observable.from(beers, Rx.Scheduler.async)
Making this change in the above code sample will print “This is the last line of the script” first and then will emit beers info.
Don’t include the entire RxJS library in your app
When you’ll be reviewing the code in CodePen, not that the HTML document imports the entire RxJS library as follows:
<script
src="https://unpkg.com/@reactivex/rxjs/dist/global/Rx.js">
</script>
This loads the entire RxJS library even though we only use the Observable, from(), filter(), and map(). Loading the entire library for demo purposes is OK, but in the real-world apps, we want to minimize the app size and load only the code we actually use. For example, in JavaScript or TypeScript apps we’d install RxJS locally and write something like this:
import 'rxjs/add/operator/map';
import {Observable} from "rxjs/Observable";
Including the entire RxJS add about 90KB (33KB gzipped) to your app that uses RxJS. The size of the future version RxJS 6 will be substantially smaller, but we recommend including only the classes and functions that you use for now.
NOTE. Starting from RxJS 5.5 you can (and should) use so-called pipeable operators, which are tree-shaking-friendly and simplify import statements, e.g.
import { debounceTime, map} from 'rxjs/operators';
Using this syntax also simplifies the tree-shaking process performed during app bundling with the tools like Webpack or Rollup.
The reduce() operator
Now we’d like to introduce the operator reduce() that allows you aggregate values emitted by an observable. The marble diagram of the reduce() operator is shown next. This diagram shows an observable that emitted 1, 3, and 5, and the reduce() operator added them up producing the accumulated value of 9.

The operator reduce() has two arguments: an accumulator function where you specify how to aggregate the values, and the initial (seed) value to be used by the accumulator function. The above diagram uses zero was as an initial value, but if we’d change it to 10, the accumulated value would be 19.
As you see from the above diagram, the accumulator function also has two arguments:
* acc stores the currently accumulated value, which is available for each emitted element
* curr stores the currently emitted value.
The following code sample creates an observable for the beers array and applies two operators to each emitted element: map() and reduce(). The map() operator takes a beer object and extracts its price, and the reduce() operator adds the prices.
let beers = [
{name: "Stella", country: "Belgium", price: 9.50},
{name: "Sam Adams", country: "USA", price: 8.50},
{name: "Bud Light", country: "USA", price: 6.50},
{name: "Brooklyn Lager", country: "USA", price: 8.00},
{name: "Sapporo", country: "Japan", price: 7.50}
];
Rx.Observable.from(beers)
.map(beer => beer.price) // <1>
.reduce( (total,price) => total + price, 0) // <2>
.subscribe(
totalPrice => console.log(`Total price: ${totalPrice}`) // <3>
);
1. Transform the beer object into its price
2. Sum up the prices
3. Print the total price of all beers
Running this script will produce the following output:
Total price: 40
In this script, we were adding all prices, but you could apply any other calculation to create a different aggregate value, e.g. to calculate an average or maximum price.
The reduce() operator emits the aggregated result when the observable completes. In the above example, it happened naturally, because we created an observable from an array with a finite number of elements. In other scenarios, you’d need to invoke the method complete() on the observer explicitly, and you’ll see how to do it in the next article of this series.
To see this example running in CodePen follow this link.
Code samples from this article were turning the array into an observable and were magically pushing the array’s elements to the observer. In the next article, I’ll show you how to push elements by invoking the next() function on the observer.
If you have an account at O’Reilly’s safaribooksonline.com, you can watch my video course “RxJS Essentials” there.