Angular’s dependency injection mechanism allows us to cleanly separate business logic (services) from UI (components). What if our app generates a stream of values and we want to implement it as an injectable service? In this blog, I’ll create an injectable service that emits a stream of values and a UI component subscribes to this stream displaying its values in real time.
In one of my RxJS blogs I showed you how to use the method Observable.create() providing an observer as an argument. Let’s create a service with a method that will take an observer as an argument and will emit the current time every second.
import {Observable} from 'rxjs/Observable'; export class ObservableService{ createObservableService(): Observable<Date>{ // 1 return new Observable( // 2 observer => { // 3 setInterval(() => observer.next(new Date()) // 4 , 1000); } ); } }
1. Return an observable stream of dates
2. Create an observable
2. Provide an observer
4. Emit the new date every second
In this service, we create an instance of the RxJS Observable object, assuming that the subscriber will provide an Observer that knows what to do with the emitted data. Whenever the observable invokes the method next(new Date()) on the observer, the subscriber will receive the current date and time. Our data stream never throws an error and never completes.
We’ll inject the ObservableService into the AppComponent, which invokes the method createObservableService() and subscribes to its stream of values, creating an observer that knows what to do with the data. The observer just assigns the received time to the variable currentTime which renders the time on UI.
import 'rxjs/add/operator/map'; import {Component} from "@angular/core"; import {ObservableService} from "./observable.service"; @Component({ selector: 'app-root', providers: [ ObservableService ], template: `<h1>Custom observable service</h1> Current time: {{currentTime | date: 'mediumTime'}} // 1 `}) export class AppComponent { currentTime: Date; constructor(private observableService: ObservableService) { // 2 this.observableService.createObservableService() // 3 .subscribe( data => this.currentTime = data ); // 4 } }
1. Display the time using the date pipe
2. Inject the service that wraps the observable
3. Create the observable and start emitting dates
4. Subscribe to the stream of dates
This app doesn’t use any servers, and you can see it in action in the great Stackblitz online IDE here.
In the browser’s window, the current time will be updated every second. You use DatePipe here with the format ‘mediumTime’, which displays only hours, minutes, and seconds (all date formats are described in the DatePipe documentation).
This simple example demonstrates a basic technique for wrapping any application logic in an observable stream and subscribing to it. In this case, we use setInterval(), but you could replace it with any application-specific code that generates one or more values and sends them as a stream.
Don’t forget about error handling and completing the stream if need be. The following code snippet shows a sample observable that sends one element to the observer, may throw an error and tells the observer that the streaming is complete:
return new Observable( observer => { try { observer.next('Hello from observable'); //throw ("Got an error"); } catch(err) { observer.error(err); } finally{ observer.complete(); } } );
If you uncomment the line with a throw, observer.error() is invoked, which results in the invocation of the error handler on the subscriber if there is one.
The data producer for our observable stream was generating date/time but it could be any app code that generates some useful values, e.g. a WebSocket server generating stock quotes, auction bids, actions of online game players, etc. During my workshops, I show a sample online auction app that has a Node server emulating users’ bids on products. That server uses a WebSocket connection to push new bids for products to all users that are interested in receiving them.
RxJS offers WebSocketSubject – a ready-to-use wrapper around the browser’s WebSocket. It accepts either a string with the WebSocket endpoint or a WebSocketSubjectConfig
On subscribe, it uses either an existing connection or creates a new one; on unsubscribe, it closes the connection if there are no other subscribers. In the second edition of our Angular book, we use WebSocketSubject in the sample app ngAuction.
One thought on “Wrapping a RxJS observable stream into an Angular service”