RxJs

Note: For this lab, please use the flight-app in our starter kit.

Simple Lookahead

In this exercise, you'll implement the presented lookahead. For this, you can use the following API:

http://angular.at/api/flight?from=G

As you see in this URL, the API takes an parameter for filtering flights with respect to a specific airport name.

  1. Open the file app.module.ts and make sure, that the ReactiveFormsModule is imported into the AppModule.

    Show code

    [...]
    import { ReactiveFormsModule } from '@angular/forms';
    [...]
    
    @NgModule({
        imports: [
            BrowserModule,
            HttpClientModule,
            ReactiveFormsModule,
            [...]
       ],
       [...]
    })
    export class AppModule {}
    

  2. Switch to the folder app an create a FlightLookaheadComponent. Make sure, it is declared in the AppModule.

  3. Open the file app.routes.ts and create a route for your new component.

    Show code

    export const APP_ROUTES: Routes = [
        {
            path: '',
            redirectTo: 'home',
            pathMatch: 'full'
        },
        {
            path: 'home',
            component: HomeComponent
        },
        // Insert this line:
        {
            path: 'flight-lookahead',
            component: FlightLookaheadComponent
        },
        [...]
    
    ]
    

  4. Open the sidebar.component.html file and create a menu option for the new route.

    Show code

    <li>
      <a routerLink="flight-lookahead">
        <i class="ti-user"></i>
        <p>Lookahead</p>
      </a>
    </li>
    

  5. Switch back to your new component and add the following properties:

    control!: FormControl;
    flights$!: Observable<Flight[]>;
    loading = false;
    
  6. Inject the HttpClient into its constructor.

    Show code

    constructor(private http: HttpClient) {}
    

  7. Create a method load(from: string):Observable<Flight[]> { ... } . Implement this method, so that all flights starting at the passed airport are returned.

    Show code

    load(from: string): Observable<Flight[]> {
        const url = "http://www.angular.at/api/flight";
    
        const params = new HttpParams().set('from', from);
    
        // If you use json-server, use the parameter from_like:
        // const params = new HttpParams().set('from_like', from);
    
        const headers = new HttpHeaders().set('Accept', 'application/json');
    
        return this.http.get<Flight[]>(url, {params, headers});
    }
    

  8. Implement the Interface OnInit. Use the ngOnInit method to establish the needed dataflow between your input control (property control) and your result (flights$).

    Show code

    export class FlightLookaheadComponent implements OnInit {
    
        ngOnInit(): void {
            this.control = new FormControl();
    
            this.flights$ =
                this.control
                    .valueChanges
                    .pipe(
                        debounceTime(300),
                        tap(input => this.loading = true),
                        switchMap(input => this.load(input)),
                        tap(v => this.loading = false)
                    );
        }
    
        [...]
    }
    

  9. Open the file flight-lookahead.component.html and create an input element. Bind it to your control object. Also, display the value of your property loading.

    Show code

    <div class="card">
      <div class="header">
        <h1 class="title">Lookahead</h1>
      </div>
    
      <div class="content">
        <div class="control-group">
          <label>City</label>
          <input [formControl]="control" class="form-control" />
        </div>
    
        <div *ngIf="loading">Loading ...</div>
      </div>
    </div>
    

  10. Create a new table and bind it to your flights$ property using the async pipe.

    Show code

    <table class="table table-striped">
      <tr *ngFor="let f of flights$ | async">
        <td>{{f.id}}</td>
        <td>{{f.from}}</td>
        <td>{{f.to}}</td>
        <td>{{f.date | date:'dd.MM.yyyy HH:mm'}}</td>
      </tr>
    </table>
    

  11. Test your solution.

Combining Streams *

In this example, you'll introduce another observable that simulates the network state. Also, you will make sure, that your solution will only search for flights if the state is "connected".

  1. Add the following properties to your component:

    • online = false;
    • online$!: Observable<boolean>;
  2. Add the following lines to your ngOnInit method.

    ngOnInit(): void {
        this.control = new FormControl();
    
        this.online$ = interval(2000).pipe(
            startWith(0),
            map(_ => Math.random() < 0.5),
            distinctUntilChanged(),
            tap(value => this.online = value)
        );
        [...]
    }
    

    As you can see here, online$ can emit a new network state (true or false for connected and disconnected) every two seconds. As it is a cold observable, it will only start sending data after a a subscription has been setup. Hence, you'll combine it with the other observable in the next step.

  3. Have a look at http://rxmarbles.com to find out how combineLatest and filter work. Try to find out how to use them to combine the new online$ observable with the existing flights$ observable. The goal is to only search for flights when the machine is connected.

    Hint: combineLatest returns an array with the current values of the combined observables:

    combineLatest([observable1, observable2]).subscribe(
        (tuple) => {
            const latestFromObservable1 = tuple[0];
            const latestFromObservable2 = tuple[1];
            [...]
        }
    )
    
    [...]
    
    // The same, with a compacter syntax:
    combineLatest([observable1, observable2]).subscribe(
        ([latestFromObservable1, latestFromObservable2]) => {
            [...]
        }
    )
    
    // Since RxJS 7, we can also write it that way:
    combineLatest({o1: observable1, o2: observable2}).subscribe(
        (combined) => {
            const latestFromObservable1 = combined.o1;
            const latestFromObservable2 = combined.o2;
        }
    )
    

    Hint: Further information about combineLatest can be found at https://www.learnrxjs.io.

    Solution

    this.online$ = interval(2000).pipe(
      startWith(0),
      map((_) => Math.random() < 0.5),
      distinctUntilChanged(),
      tap((value) => (this.online = value))
    );
    
    const input$ = this.control.valueChanges.pipe(debounceTime(300));
    
    this.flights$ = combineLatest({ input: input$, online: this.online$ }).pipe(
      filter(combined) => combined.online),
      map((combined) => combined.input),
      switchMap((combined) => this.load(combined.input))
    );
    

  4. Display the value of online (not online$) via data binding:

    <div>Online: {{online}}</div>
    
  5. Test your solution.

Multicasting

In this example, we share the online$ observable to be able to use it multiple times -- with combineLatest and in the template.

  1. In your flight-lookahead.component.ts, remove the usage of the online property. Also, add shareReplay to online$:
// Remove this line:
// online = false;  

this.online$ = interval(2000).pipe(
    startWith(0),
    map(_ => Math.random() < 0.5),
    distinctUntilChanged(),
    
    // Remote this line:
    // tap(value => this.online = value)
    
    // Add this line:
    shareReplay({ bufferSize: 1, refCount: true }),

);
  1. Switch to flight-lookahead.component.html and use online$ with the async pipe instead of online:
<div [ngStyle]="{'background-color': (online$ | async)? 'green' : 'red'}">Online: {{online$ | async}}</div>
  1. Try out your solution

Subjects

In this exercise, you will switch out the loading property for a BehaviorSubject.

  1. In your flight-lookahead.component.ts, remove the declaration of loading and add a BehaviorSubject instead:
// Remove this line
// loading = false;

// Add these lines:
private loadingSubject = new BehaviorSubject(false);
readonly loading$ = this.loadingSubject.asObservable();

Remark: This pattern of combining a private subject with a public/ readonly Observable is especially important in services used in several parts of the application. It makes sure, only the Subject's "owner" can emit new values.

  1. Also, instead of setting loading, emit a new value via the subject:
this.flights$ = combineLatest({ input: input$, online: this.online$ }).pipe(
    filter((combined) => combined.online),

    // Update:
    tap(() => this.loadingSubject.next(true)),

    switchMap((combined) => this.load(combined.input)),

    // Update:
    tap(() => this.loadingSubject.next(true)),
);
  1. In your template, make sure to use loading$ with the async pipe:
<div *ngIf="loading$ | async">Loading ...</div>
  1. Try out your solution.

Bonus: combineLatest vs. withLatestFrom

Change your solution to use withLatestFrom instead of combineLatest. Please note that withLatestFrom isn't a creation operator (factory) but called within pipe:

this.flights$ = input$.pipe(
  withLatestFrom(this.online$),
  filter(([, online]) => online),
  map(([input]) => input),
  tap(() => (this.loading = true)),
  switchMap((name) => this.load(name)),
  tap(() => (this.loading = false))
);

Find out, how your application behaves differently now.

Bonus: Search by from and to **

Implement a second textbox for the airport of destination (field to). When ever the field from or to is modified, the result table shall be updated.

Make sure, no query is sent to the server, when both, from and to are empty.

Hint: combineLatest can take several parameters:

combineLatest([a$, b$, c$]).pipe(
    tap(([a,b,c]) => console.debug('abc', a, b, c) );
)

Bonus: Refresh Button ***

Now, let's try to introduce a button reloading the current result set. For this, add an observable and a click handler for the button:

private refreshClickSubject = new Subject<void>();
refreshClick$ = this.refreshClickSubject.asObservable();

refresh(): void {
    this.refreshClickSubject.next();
}
Solution

[...]

const debouncedFrom$ = this.controlFrom.valueChanges.pipe(debounceTime(300));
const debouncedTo$ = this.controlTo.valueChanges.pipe(debounceTime(300));

const combined$ = combineLatest([debouncedFrom$, debouncedTo$, this.online$])
                    .pipe(shareReplay({ bufferSize:1, refCount: true }));

this.flights$ = merge(
    combined$,
    this.refreshClick$.pipe(map(_ => [this.controlFrom.value, this.controlTo.value, this.online]))

    // This is an alternative without side effects (like this.online):
    // this.refreshClick$.pipe(switchMap(_ => combined$.pipe(take(1))))
).pipe(
    filter( ([_, __,  online]) => online),
    map(([value, valueTo, _]) => [value, valueTo]),
    // further remaining operators
);

The button in the template looks like this:

<button (click)="refresh()">Refresh</button>

To implement the logic, merge the result of your existing combineLatest call with refreshClick$. You find some information about merge here and here.

Make sure to emit the current search criteria via refreshClick$.

Error Handling *

Have a look at the description of catchError and retry and try to use these operators in your lookahead example.

Hint: Change your online$ observable so that it always returns true (map(_ => true)).

Hint: Use your network tab within the F12 tools to simulate an offline state.

Closing Observables

Extend your online$ observable as follows:

this.online$ = interval(1000).pipe(
  startWith(-1),
  tap(v => console.log('counter', v)),
  [...]
);

As you are just subscribing via async, the counter should stop when moving over to another route.

Try out the following experiments:

Experiment 1

  1. Set refCount for online$ to false (shareReplay({ bufferSize: 1, refCount: true})).
  2. Expected result: The counter does not stop.
  3. Undo this experiment.

Experiment 2

  1. Also subscribe to online$ by hand without unsubscribing:
this.online$.subscribe();
  1. Expected result: The counter does not stop either.

Experiment 3

  1. Unsubscribe your subscription from experiment 2 by hand:

    const subscription = this.online$.subscribe();
    setTimeout(() => subscription.unsubscribe(), 7000);
    
  2. Expected result: The counter should stop if you move to another route (at least 7 after loading the flight lookahead).

Experiment 4

  1. Introduce a close$ subject as a class member:

    export class FlightLookaheadComponent {
      close$ = new Subject<void>();
    
      [...]
    }
    
  2. Implement the OnDestroy interface to notify this Subject:

    export class FlightLookaheadComponent implements OnDestroy {
    
      [...]
      ngOnDestroy(): void {
          this.close$.next();
      }
      [...]
    }
    
  3. When subscribing to online$ by hand, use takeUntil:

    this.online$.pipe(takeUntil(this.close$)).subscribe();
    
  4. Expected result: Expected result: The counter should stop if you move to another route