Categories
Software

Learning by Implementing: Observables

Sometimes, the best way to learn a new concept is to try to implement it. With my journey with reactive programming, my attempts at implementing Observables were key to to my ability to intuit how to best use them. In this post, we’ll be trying various strategies of implementing an Observable and see if we can make get to working solution.

I’ll be using TypeScript and working to implement something similar to RxJS in these examples, but the intuition should be broadly applicable.

First thing’s first, though: what are we trying to implement? My favorite way or motivating Observables is by analogy. If you have some type, T, you might represent it in asynchronous programming as Future<T> or Promise<T>. Just as futures and promises are the asynchronous analog of a plain type, an Observable<T> is the asynchronous construct representing as collection of T.

The basic API for Observable is a subscribe method that takes as bunch of callbacks, each triggering on a certain event:

interface ObservableLike<T> {
  subscribe(
      onNext?: (item: T) => void,
      onError?: (error: unknown) => void,
      onDone?: () => void): Subscription;
}

interface Subscription {
  unsubscribe(): void;
}

With that, let’s get to work!

First Attempt: Mutable Observables

One way of implementing an Observable is to make sure it keeps tracks of it’s subscribers (in an array) and have the object send events to listeners as they happen.

For the purpose of this and other implementations, we’ll define an internal representation of a Subscription as follows:

Categories
Software

About those Side-effects in Observables, an Angular Use Case

When testing a codebase in Angular Ivy, I ran into a bunch of test failures I wasn’t seeing before. ExpressionChangedAfterItHasBeenCheckedErrors were being thrown around. In debugging these failures, I found that many of them are the results of side-effects in Observables and Observable pipe operations. I happened to describe these earlier in my piece on Observables, Side-effects, and Subscriptions.

Consider this minimal reproduction:

@Component({
  selector: 'widget-editor',
  templateUrl: 'widget_editor.html'
})
export class WidgetEditor {
  constructor(private readonly service: WidgetService) {}

  widgetName = '';
  widgetConfig$ = this.service.getWidget('my_widget').pipe(
    map(widgetDetails => {
      this.widgetName = widgetDetails.name;
      return widgetDetails.config;
    })
  );
}
Categories
Software

Observables, Side-effects, and Subscriptions

My previous articles on using AsyncPipe and data refresh patterns in Angular hint at some common anti-patterns dealing with Observables. If there‚Äôs any common thread in my advice, it is: delay unpacking an Observable into its scalar types when performing logic you can rewrite as side-effect-free, leaving code with side-effects for subscription callbacks and other downstream logic.

My two earlier articles focused on cases users can benefit from handling more of the object’s lifecycle in its Observable form. In other words, cases where the Observable was being subscribed to and unpacked too soon. Instead, I suggested transforming the Observable using operators like map, switchMap, filter, etc. and taking advantage of the power offered by this form. In the case of Angular, it provides AsyncPipe, which takes the care of the step with side-effects (actually rendering the page) in template code.

There are some exceptions to this line of thinking, namely do and tap are reactive operators exclusively there for functions with side effects. I’ll leave a discussion of right vs less right reasons to use do/tap for a later article. But I’ll mention logging, error reporting, and caching of otherwise pure functions as one valid use of side-effects.

This article uses RxJS in code examples, but applies to broader reactive concepts.

Let’s explore a few of these cases:

1. Displaying data represented by Observables

Say I have two Observables wrapping some object in a storage format (e.g. JSON), and I’d like to display it.

Unpacking an observable too soon

let customerName: string;
let customerBalance: number;

nameObservable.subscribe(name => {
  customerName = name;
  if (customerName && customerBalance) {
    processAndDraw();
  }
});
balanceObservable.subscribe(balance => {
  customerBalancer = balance;
  if (customerName && customerBalance) {
    processAndDraw();
  }
});
function processAndDraw() {
  alert(`${customerName}: $${customerBalance.toFixed(2) USD`);
}

If a caller unpacks an observable too soon, it means they’re dealing with scalars, passing things around by global state. Developers might have trouble handling changes, such as adding a third data source to show.

Unpacking an Observable too late

combineLatest(nameObservable, balanceObservable).pipe(
  map(([name, balance]) => {
    alert(`${name}: $${balance.toFixed(2) USD`);
  })
).subscribe();

On the one hand, this is much shorter and more expressive! This is effectively maps Observable<[string, number]> into an Observable<void> which happens to perform side effects when subscribed to. The subscriber, however, has no idea what action will take place from just looking at a type or signature. Even with the code snippet above used as-is, it is very easy to forget about that last .subscribe() call, which–given that Observables are lazy by default and only perform useful actions when subscribed to–renders this whole snippet a no-op.

One final reason side-effects are bad in operators: that these side-effects can be performed an arbitrary number of times per event based on how many distinct subscribers are listening to an Observable.

A better trade-off

combineLatest(nameObservable, balanceObservable).pipe(
  map(([name, balance]) =>
    `${name}: $${balance.toFixed(2) USD`
  )
).subscribe(text => alert('Text'));

2. Avoiding Unnecessary Indirection through Subjects

In some ReactiveX implementation, a Subject is a powerful concept that allows an event publisher to share events with subscribers, as an Observable. It is also quite overused. Dave Sexton wrote a great piece in 2013 about whether or not to use a Subject, and further quoted Eric Meijer’s reasoning for disliking them:

[Subjects] are the “mutable variables” of the Rx world and in most cases you do not need them.

Erik Meijer, via To Use Subject or Not To Use Subject?

In particular, I’ve come across many examples in the wild violating Sexton’s first piece of advice, “What is the source of the notifications?” Here’s an egregious anti-pattern:

Categories
Software

Data and Page Content Refresh patterns in Angular

Part of why I recommend using RxJS Observables all the way through in Angular TypeScript code, and only unpacking them at the closest point to where the UI is declared (often using the | async pipe), is because it makes other transformations on an Observable available and convenient. Two such examples include retry and refresh logic.

Two common reasons to reload/refresh data being displayed by a component include:

  1. A user action in the application causes the data to change (especially, if it does so in ways that might result in complex state changes, etc.), and/or
  2. The data being displayed can change over time (e.g. due to a progression/change of it’s state in the backend, or by another user, etc.)

Let’s start with a simple example:

@Component({
  selector: 'task',
  template: `<ng-container *ngIf="(task$ | async) as task">
    <h1>{{task.name}}</h1>
    <p>Status: {{task.status}}</p>
    <sub-task *ngFor="let subTask of task.subtasks" [subTask]="subTask"/>`
})
export class TaskComponent {
  constructor(private readonly http: HttpClient) {}
  readonly task$ = this.http.get('/api/tasks/foo');
}

Suppose the user adds a ‘Mark as Complete’ button, that mutates the server-side state of all sub-tasks. How do obtain the latest authoritative data from the server about the state of our side? Here’s an approach:

export class TaskComponent {
  constructor(private readonly http: HttpClient) {}
  
  private readonly refreshToken$ = new BehaviorSubject(undefined);
  private readonly task$ = this.refreshToken$.pipe(
    switchMap(() => this.http.get('/api/tasks/foo')));

  markAsComplete() {
    this.http.post('/api/tasks/foo', { state: State.Done })
      // N.B. contrary to my advice elsewhere, I'm happy to
      // directly subscribe here because this subscribe
      // callback has side effects.
      // Further, I don't worry about unsubscribing since
      // this returned Observable is a one-shot observable
      // that will complete after a single request.
      .subscribe(() => this.refreshToken$.next(undefined));
  }
}

Adding refresh logic this way will minimally affect our template code and looks relatively clean. Better yet, adding additional mutating functions simply need to call refreshToken$.next to make sure new data is loaded.

What about regularly polling for updates? This can be implemented simply as well:

export class TaskComponent {
  constructor(private readonly http: HttpClient) {}
  
  private readonly autoRefresh$ =
    interval(TASK_REFRESH_INTERVAL_MS)
      .pipe(startWith(0));

  private readonly refreshToken$ = new BehaviorSubject(undefined);

  private readonly task$ =
    // Notice that combineLatest will only trigger the first
    // time when an event triggers on all input Observables
    // you are combining.
    // BehaviorSubject always triggers its latest value when
    // you subscribe to it, so we're good there.
    // An interval() Observable will need a 'startWith' to
    // give you an initial event.
    combineLatest(this.autoRefresh$, this.refreshToken$)
    .pipe(
      switchMap(() => this.http.get('/api/tasks/foo'))
    );

  markAsComplete() {
    this.http.post('/api/tasks/foo', { state: State.Done })
      .subscribe(() => this.refreshToken$.next(undefined));
  }
}

What if we didn’t want to hardcode foo as the task we look up? Well, Angular’s ActivatedRoute already uses Observables. Rather than using route.snapshot.params['task_id'] or similar, we can use the actual Observable results and get our minds off manually refreshing that data: