Wrapping a RxJS observable stream into an Angular service

Angular’s dependency injection mechanism allows us to cleanly separate business logic (services) from UI (components). What if our app generates a stream of values and we want to implement it as an injectable service? In this blog, I’ll create an injectable service that emits a stream of values and a UI component subscribes to this stream displaying its values real time.

In one of my RxJS blogs  I showed you how to use the method Observable.create() providing an observer as an argument. Let’s create a service with a method that will take an observer as an argument and will emit the current time every second.

import {Observable} from 'rxjs/Observable';

export class ObservableService{

  createObservableService(): Observable<Date>{  // 1

      return new Observable(  // 2
          observer => {   // 3
              setInterval(() =>
                  observer.next(new Date())  // 4
              , 1000);

1. Return an observable stream of dates
2. Create an observable
2. Provide an observer
4. Emit the new date every second

In this service, we create an instance of the RxJS Observable object, assuming that the subscriber will provide an Observer that knows what to do with the emitted data. Whenever the observable invokes the method next(new Date()) on the observer, the subscriber will receive the current date and time. Our data stream never throws an error and never completes.

We’ll inject the ObservableService into the AppComponent, which invokes the method createObservableService() and subscribes to its stream of values, creating an observer that knows what to do with the data. The observer just assigns the received time to the variable currentTime which renders the time on UI.

import 'rxjs/add/operator/map';
import {Component} from "@angular/core";
import {ObservableService} from "./observable.service";

  selector: 'app-root',
  providers: [ ObservableService ],
  template: `<h1>Custom observable service</h1>
       Current time: {{currentTime | date: 'mediumTime'}}  // 1
export class AppComponent {

  currentTime: Date;

  constructor(private observableService: ObservableService) { // 2

    this.observableService.createObservableService()  // 3
      .subscribe( data => this.currentTime = data );  // 4

1. Display the time using the date pipe
2. Inject the service that wraps the observable
3. Create the observable and start emitting dates
4. Subscribe to the stream of dates

This app doesn’t use any servers, and you can see it in action in the great Stackblitz online IDE  here.

In the browser’s window, the current time will be updated every second. You use DatePipe here with the format ‘mediumTime’, which displays only hours, minutes, and seconds (all date formats are described in the DatePipe documentation).

This simple example demonstrates a basic technique for wrapping any application logic in an observable stream and subscribing to it. In this case, we use setInterval(), but you could replace it with any application-specific code that generates one or more values and sends them as a stream.

Don’t forget about error handling and completing the stream if need be. The following code snippet shows a sample observable that sends one element to the observer, may throw an error, and tells the observer that the streaming is complete:

return new Observable(
    observer => {
      try {
        observer.next('Hello from observable');

        //throw ("Got an error");

      } catch(err) {
      } finally{

If you uncomment the line with throw, observer.error() is invoked, which results in the invocation of the error handler on the subscriber if there is one.

The data producer for our observable stream was the time generator, but it could be any app code that generates values, e.g. a WebSocket server generating stock quotes, auction bids, actions of online game players, etc. Currently, I’m writing a chapter on using WebSockets in Angular apps for the second edition of our Angular book (use promo codefccfain for 37% off), and it comes with a sample online auction app that has a Node server emulating users’ bids on products. That server pushes new bids real-time using WebSockets.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s