What is RxJS and How is it Integrated in Angular?

What is RxJS?

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming, based on data streams and observable values (Observables).

RxJS provides powerful tools for working with:

  • asynchronous events,
  • data streams (e.g., HTTP requests, WebSocket, DOM events),
  • timers, keyboard input, form changes, etc.

How Does RxJS Work?

At core of RxJS is Observable — object that emits data over time, which you can subscribe to (subscribe) and process using operators (pipe).

import { Observable } from 'rxjs';

const obs$ = new Observable(observer => {
  observer.next('Hello');
  observer.next('RxJS');
  observer.complete();
});

obs$.subscribe(value => console.log(value));

Will output: Hello, then RxJS.

How is RxJS Integrated in Angular?

In Angular RxJS is built-in by default, and used in almost every important framework element:

Where RxJS is used in AngularHow exactly
HttpClientReturns Observable instead of Promise
Reactive FormsReactive change support (valueChanges)
RouterNavigation listening (router.events)
Services & StateUsing Subject, BehaviorSubject and others
Async PipeAutomatic subscription and unsubscription from Observable

Example: HTTP Request with RxJS in Angular

import { HttpClient } from '@angular/common/http';
import { Component, OnInit } from '@angular/core';

@Component({
  selector: 'app-users',
  template: `<ul><li *ngFor="let user of users">{{ user.name }}</li></ul>`
})
export class UsersComponent implements OnInit {
  users: any[] = [];

  constructor(private http: HttpClient) {}

  ngOnInit() {
    this.http.get<any[]>('https://jsonplaceholder.typicode.com/users')
      .subscribe(data => this.users = data);
  }
}

Main RxJS Operators

Operators are used in .pipe() to process streams:

CategoryExamplesDescription
Filteringfilter, take, debounceTimeSkip/select needed values
Transformationmap, switchMap, mergeMapTransform or replace stream
CombiningcombineLatest, withLatestFromCombining multiple Observables
Error handlingcatchError, retryError processing

Example with pipe:

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    filter(x => x % 2 === 0),
    map(x => x * 10)
  )
  .subscribe(console.log); // Will output: 20, 40

Subject vs BehaviorSubject

  • Subject — doesn't store previous value, subscribers get only new ones
  • BehaviorSubject — stores last value and gives it to new subscribers

Subscription Management Important to unsubscribe from subscriptions to avoid memory leaks:

import { Subscription } from 'rxjs';

export class MyComponent implements OnDestroy {
  private sub!: Subscription;

  ngOnInit() {
    this.sub = this.myService.getData().subscribe();
  }

  ngOnDestroy() {
    this.sub.unsubscribe(); // required!
  }
}

Or use async pipe:

<div *ngIf="data$ | async as data">
  {{ data }}
</div>

Angular manages subscription and unsubscription automatically.

RxJS in Angular:

RxJS is foundation of reactive approach in Angular. Ability to work with Observables, operators and Subject — key to creating performant and scalable applications.