Angular uses Observables as an interface to handle asynchronous operations. For instance: The HTTP module uses observables to handle AJAX requests and responses, the Router and Forms modules use observables to listen for and respond to user-input events etc.
What comes to your mind when you hear the word Angular Observable? Terms like data streams, Reactive programming, Observable, Observers, RxJS, etc. are associated with Angular Observable. This article will cover what observable is in Angular and how to use Observables in Angular applications.
What is a Data Stream?
Reactive programming(Rx) is programming with asynchronous data streams. Therefore it’s important to first understand what data stream is. A stream is a data that arrives over a period of time. You are able to create data streams of anything, not just from click and hover events. Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc.
A stream can be an input to another. Multiple streams can also be inputs to another stream. Two streams can be merged. You can filter a stream to get one that has only those events you are interested in. You can map data values from one stream to a new one.
Stream can emit three different things: a value (of some type), an error, or a “completed” signal. Consider that the “completed” takes place, for instance, when the current window or view containing that button is closed.
We capture these emitted events only asynchronously, by defining a function that will execute when a value is emitted, another function when an error is emitted, and another function when ‘completed’ is emitted. Sometimes these last two can be omitted and you can just focus on defining the function for values. The “listening” to the stream is called subscribing. The functions we are defining are observers. The stream is the subject (or “observable”) being observed.
As said earlier the stream of data can be anything. For Example:
- Mouse click or Mouse hover events with x & y positions
- Keyboard events like keyup, keydown, keypress, etc
- Form events like value changes etc
- Data which arrives after an HTTP request
- User Notifications
- Measurements from any sensor
Streams:
- emit zero, one or more values of any time.
- can also emit errors.
- must emit the complete signal, when completed (finite streams).
- can be infinite, meaning they never complete
Now that you know what data stream is, let’s proceed with Reactive Programming.
Reactive Programming
Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some effects. Reactive is that idea on steroids. Reactive programming is all about creating the stream, emitting value, error or complete signals, manipulate, transfer or do something useful with the data streams.
This is where RxJs comes in to the picture.
The introduction to Reactive Programming you’ve been missing gives you a very nice introduction to Reactive Programming.
What is RxJS
The RxJS (Reactive Extensions Library for JavaScript) is a javascript library, that allows us to work with asynchronous data streams.
Angular uses the RxJS library heavily in its framework to implement Reactive Programming. Some of the examples where reactive programming used are:
- Reacting to an HTTP request in Angular
- Value changes / Status Changes in Angular Forms
- The Router and Forms modules use observables to listen for and respond to user-input events.
- You can define custom events that send observable output data from a child to a parent component.
- The HTTP module uses observables to handle AJAX requests and responses.
The RxJs has two main players
- Observable
- Observers ( Subscribers)
What is an Observable in Angular?
An Observable is a lazily evaluated computation that can synchronously or asynchronously return zero to (potentially) infinite values from the time it’s invoked onwards. In other words, Observable is a function that converts the ordinary stream of data into an observable stream of data.
Observable emits the value from the stream asynchronously. It emits the complete signals when the stream completes or an error signal if the stream errors out.
Observables are declarative. You define an observable function just like any other variable. The observable starts to emit values only when someone subscribes to it. Subscribing to an Observable is analogous to calling a Function.
Who are observers (subscribers)?
The Observable on its own is useless unless someone consumes the value emitted by the observable. We call them observers or subscribers.
The observers communicate with the Observable using callbacks.
The observer must subscribe with the observable to receive the value from the observable. While subscribing, it optionally passes the three callbacks. next()
, error()
& complete().
The observable starts emitting the value as soon as the observer or consumer subscribes to it.
The observable invokes the next()
callback whenever the value arrives in the stream. It passes the value as the argument to the next callback. If the error occurs, then the error()
callback is invoked. It invokes the complete()
callback when the stream completes.
- Observers/subscribers subscribe to Observables
- Observer registers three callbacks with the observable at the time of subscribing. i .e
next()
,error()
&complete()
- All three callbacks are optional
- The observer receives the data from the observer via the
next()
callback - They also receive the errors and completion events from the Observable via the
error()
&complete()
callbacks.
Now, we have learned the basics of the RxJs Observable, let us now see how it works using an example.
I am going to use an online editor, Stackblitz.
Observables are created using new Observable or a creation operator, are subscribed to with an Observer, executed to deliver next
/ error
/ complete
notifications to the Observer, and their execution may be disposed. These four aspects are all encoded in an Observable instance, but some of these aspects are related to other types, like Observer and Subscription.
Core Observable concerns:
- Creating Observables
- Subscribing to Observables
- Executing the Observable
- Unsubscribing Observables
Creating Observables
There are few ways in which you can create observable in angular. Simplest is to use the Observable constructor. The observable constructor takes observer (or subscriber) as its argument. The subscriber will run when this observable’s subscribe()
method executes.
The following example creates an observable of a stream of numbers 1, 2, 3, 4, 5
obs = new Observable((observer) => {
console.log(“Observable starts”)
observer.next(“1”)
observer.next(“2”)
observer.next(“3”)
observer.next(“4”)
observer.next(“5”)
})
The variable obs
is now of the type of Observable.
The above example declares the obs
as the observable but does not instantiate it. To make the observable emit values, we need to subscribe to it.
In the above example, we used the Observable Constructor to create the Observable. There are many operators available with the RxJS library, which makes the task of creating the observable easy. These operators help us to create observable from an array, string, promise, any iterable, etc.
Here are list some of the commonly used operators:
- create
- defer
- empty
- from
- fromEvent
- interval
- of
- range
- throwError
- timer
Subscribing to an Observable
A subscribe
call is simply a way to start an "Observable execution" and deliver values or events to an Observer of that execution. Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
We subscribe to the observable, by invoking the subscribe
method on it. We can optionally, include the three callbacks next()
, error()
& complete()
as shown below:
ngOnInit() {
this.obs.subscribe(
val => { console.log(val) }, //next callback
error => { console.log("error") }, //error callback
() => { console.log("Completed") } //complete callback
)
}
The complete app.component.ts
code is as shown below.
obs = new Observable((observer) => {
console.log("Observable starts")
observer.next("1")
observer.next("2")
observer.next("3")
observer.next("4")
observer.next("5")
})
ngOnInit() {
this.obs.subscribe(
val=> { console.log(val) },
error => { console.log("error")},
() => {console.log("Completed")}
)
}
Now, run the code and watch the debug window.
Add an Interval
We can add a timeout to insert a one second delay in each next()
callback.
obs = new Observable(observer => {
console.log('Observable starts');
setTimeout(() => {
observer.next('1');
}, 1000);
setTimeout(() => {
observer.next('2');
}, 2000);
setTimeout(() => {
observer.next('3');
}, 3000);
setTimeout(() => {
observer.next('4');
}, 4000);
setTimeout(() => {
observer.next('5');
}, 5000);
});
Executing Observables
There are three types of values an Observable Execution can deliver:
- “Next” notification: sends a value such as a Number, a String, an Object, etc.
- “Error” notification: sends a JavaScript Error or exception.
- “Complete” notification: does not send a value.
“Next” notifications are the most important and most common type: they represent actual data being delivered to a subscriber. “Error” and “Complete” notifications may happen only once during the Observable Execution, and there can only be either one of them.
In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.
As mentioned earlier, the observable can also emit an error. This is done by invoking the error()
callback and passing the error object. The observables stop after emitting the error signal. Hence values 4 & 5 are never emitted in the example below.
obs = new Observable((observer) => {
console.log("Observable starts")
setTimeout(() => { observer.next("1") }, 1000);
setTimeout(() => { observer.next("2") }, 2000);
setTimeout(() => { observer.next("3") }, 3000);
setTimeout(() => { observer.error("error emitted") }, 3500); //sending error event. observable stops here
setTimeout(() => { observer.next("4") }, 4000); //this code is never called
setTimeout(() => { observer.next("5") }, 5000);
})
You can send the error object as the argument to the error method.
It is a good idea to wrap any code in subscribe
with try
/catch
block that will deliver an Error notification if it catches an exception:
obs = new Observable((observer) => {
console.log("Observable starts")
try{
setTimeout(() => { observer.next("1") }, 1000);
setTimeout(() => { observer.next("2") }, 2000);
setTimeout(() => { observer.next("3") }, 3000);
setTimeout(() => { observer.complete();
} catch (err) {
observer.error(err); // delivers an error if it caught one
}
});
Similarly the complete event, the Observables stop after emitting the complete signal. Hence values 4 & 5 are never emitted.
obs = new Observable((observer) => {
console.log("Observable starts")
setTimeout(() => { observer.next("1") }, 1000);
setTimeout(() => { observer.next("2") }, 2000);
setTimeout(() => { observer.next("3") }, 3000);
setTimeout(() => { observer.complete() }, 3500); //sending complete event. observable stops here
setTimeout(() => { observer.next("4") }, 4000); //this code is never called
setTimeout(() => { observer.next("5") }, 5000);
})
Observable Operators
The Operators are functions that operate on an Observable and return a new Observable.
The power of observable comes from the operators. You can use them to manipulate the incoming observable, filter it, merge it with another observable, alter the values or subscribe to another observable.
You can also chain each operator one after the other using the pipe. Each operator in the chain gets the observable from the previous operator. It modifies it and creates a new observable, which becomes the input for the next observable.
The following example shows the filter & map operators chained inside a pipe. The filter operator removes all data which is less than or equal to 2 and the map operator multiplies the value by 2.
The input stream is [1,2,3,4,5] , while the output is [6, 8, 10].
obs.pipe(
obs = new Observable((observer) => {
observer.next(1)
observer.next(2)
observer.next(3)
observer.next(4)
observer.next(5)
observer.complete()
}).pipe(
filter(data => data > 2), //filter Operator
map((val) => {return val as number \* 2}), //map operator
)
The following table lists some of the commonly used operators.
Unsubscribing from an Observable
We need to unsubscribe to close the observable when we no longer require it. If not it may lead to memory leak & Performance degradation.
To Unsubscribe from an observable, we need to call the Unsubscribe()
method on the subscription. It will clean up all listeners and frees up the memory.
The Subscription represents the ongoing execution, and has a minimal API which allows you to cancel that execution. Read more about the Subscription type here. With subscription.unsubscribe()
you can cancel the ongoing execution.
When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe()
to cancel the execution.
To do that, first, create a variable to store the Subscription.
obs: Subscription;
Assign the subscription to the `obs` variable.
this.obs = this.src.subscribe(value => {
console.log("Received " + this.id);
});
Call the unsubscribe() method in the ngOnDestroy method.
ngOnDestroy() {
this.obs.unsubscribe();
}
When we destroy the component, the observable is unsubscribed and cleaned up.
But, you do not have to unsubscribe from every subscription. For Example, the observables, which emits the complete signal, close the observable.
References
- observables
- RX-library
- observables in angular
- Practical observable usage
- Comparing observables
- Observable Design Pattern
Summary
Reactive programming is about programming the stream. The RxJS library brings Reactive Programming into Angular. Using RxJs, we can create an observable, which can emit the next value, error, and complete signals to the subscriber of the observable.
Follow me to get more updates on new articles.