Observables, Side-effects, and Subscriptions

My previous articles on using AsyncPipe and data refresh patterns in Angular hint at some common anti-patterns dealing with Observables. If there’s any common thread in my advice, it is: delay unpacking an Observable into its scalar types when performing logic you can rewrite as side-effect-free, leaving code with side-effects for subscription callbacks and other downstream logic.

My two earlier articles focused on cases users can benefit from handling more of the object’s lifecycle in its Observable form. In other words, cases where the Observable was being subscribed to and unpacked too soon. Instead, I suggested transforming the Observable using operators like map, switchMap, filter, etc. and taking advantage of the power offered by this form. In the case of Angular, it provides AsyncPipe, which takes the care of the step with side-effects (actually rendering the page) in template code.

There are some exceptions to this line of thinking, namely do and tap are reactive operators exclusively there for functions with side effects. I’ll leave a discussion of right vs less right reasons to use do/tap for a later article. But I’ll mention logging, error reporting, and caching of otherwise pure functions as one valid use of side-effects.

This article uses RxJS in code examples, but applies to broader reactive concepts.

Let’s explore a few of these cases:

1. Displaying data represented by Observables

Say I have two Observables wrapping some object in a storage format (e.g. JSON), and I’d like to display it.

Unpacking an observable too soon

let customerName: string;
let customerBalance: number;

nameObservable.subscribe(name => {
  customerName = name;
  if (customerName && customerBalance) {
    processAndDraw();
  }
});
balanceObservable.subscribe(balance => {
  customerBalancer = balance;
  if (customerName && customerBalance) {
    processAndDraw();
  }
});
function processAndDraw() {
  alert(`${customerName}: $${customerBalance.toFixed(2) USD`);
}

If a caller unpacks an observable too soon, it means they’re dealing with scalars, passing things around by global state. Developers might have trouble handling changes, such as adding a third data source to show.

Unpacking an Observable too late

combineLatest(nameObservable, balanceObservable).pipe(
  map(([name, balance]) => {
    alert(`${name}: $${balance.toFixed(2) USD`);
  })
).subscribe();

On the one hand, this is much shorter and more expressive! This is effectively maps Observable<[string, number]> into an Observable<void> which happens to perform side effects when subscribed to. The subscriber, however, has no idea what action will take place from just looking at a type or signature. Even with the code snippet above used as-is, it is very easy to forget about that last .subscribe() call, which–given that Observables are lazy by default and only perform useful actions when subscribed to–renders this whole snippet a no-op.

One final reason side-effects are bad in operators: that these side-effects can be performed an arbitrary number of times per event based on how many distinct subscribers are listening to an Observable.

A better trade-off

combineLatest(nameObservable, balanceObservable).pipe(
  map(([name, balance]) =>
    `${name}: $${balance.toFixed(2) USD`
  )
).subscribe(text => alert('Text'));

2. Avoiding Unnecessary Indirection through Subjects

In some ReactiveX implementation, a Subject is a powerful concept that allows an event publisher to share events with subscribers, as an Observable. It is also quite overused. Dave Sexton wrote a great piece in 2013 about whether or not to use a Subject, and further quoted Eric Meijer’s reasoning for disliking them:

[Subjects] are the “mutable variables” of the Rx world and in most cases you do not need them.

Erik Meijer, via To Use Subject or Not To Use Subject?

In particular, I’ve come across many examples in the wild violating Sexton’s first piece of advice, “What is the source of the notifications?” Here’s an egregious anti-pattern:

class DogNewsProvider {
  constructor(news: Observable<News>) {
    news.subscribe(newsItem => {
      if (newsItem.category === "Dog") {
        this._dogNews.next(new DogNews(newsItem));
      }
    });
  }
  private readonly _dogNews = new ReplaySubject<DogNews>(1);

  get(): Observable<DogNews> {
    return this._dogNews.asObservable();
  }
}

Here, we’re providing Observable<DogNews>, based on source data contained by another observable. In between, however, we’re routing information from an Observable to a ReplaySubject, which we are manually triggering on each event from the source observable.

This has a few problems:

  1. The code above is flawed in that the news observable provided to DogNewsProvider is never unsubscribed to. Modifying the class to support unsubscribing is easy, but not ergonomic and easy to miss.
  2. A bunch of indirection is happening between the source and the output, making the flow of data less clear.
  3. The advantage of a replay Subject (namely that someone subscribing to an Observable will get some number of events the missed–1 in this example) can be replicated by applying the shareReplay operator to the source observable.

A better approach

class DogNewsProvider {
  constructor(news: Observable<News>) {
    this.dogNews = news.pipe(
      filter(newsItem => newsItem.category === "Dog"),
      map(newsItem => new DogNews(newsItem)),
      shareReplay(1),
    );
  }
  readonly dogNews: Observable<DogNews>;
}

A subscription to dogNews is not leaked in this case, and events will only fire while there are active listeners to this Observable. Further, the flow from news all the way to dogNews is clearly and directly explained by looking at the code, without jumping between callbacks.

One way to think about this indirection is in the way Sexton described: there’s some unnecessary indirection in unpacking an Observable, just to pack it into an Observable again and send it off.

Another way to think about this indirection is to look at the subscribe() method. Does it have side-effects? Yes, it does. It calls a method with side effects on a variable outside its scope. Can it be rewritten in a way that doesn’t? It can. There’s no incremental state we’re trying to maintain on purpose (other than the replay operator semantics); no I/O in the subscription; the side-effects outside of our scope that we’re modifying is mainly to pipe data from one place to another. These provide clues that there might be a better way to rewrite our callback into a series of monadic operators on an Observable.

3. Subscribing when switchMap or flatMap would do

Seeing nested subscribe callbacks is a good sign some logic can be reworked. Let’s take a simplified example:

function getBalance(
    name: string,
    showBalance: (balance: number) => void) {
  backend.getAccountIdByName(name).subscribe(id => {
    backend.getBalanceById(id).subscribe(balance => {
      showBalance(balance);
    });
  });
}

Other than the same issues with unsubscriptions as above, the callback hell also obscures what would be a relatively simple data flow:

function getBalance(
    name: string,
    showBalance: (balance: number) => void) {
  backend.getAccountIdByName(name).pipe(
    switchMap(id => backend.getBalanceById(id)
  ).subscribe(balance => {
    showBalance(balance);
  });
}

Keep showBalance in a subscription, and transform an account holder name all the way to an account balance through operators on Observables.

The bigger advantage to organizing code like this is that it promotes opportunities to refactor:

Use opportunities to factor out Observable-returning functions

We can actually factor out part of getBalance that actually returns an Observable<number>. This allows it to be reused, combined, and multiplexed with other Observables and Observable operators as needed. Observables are also a uniform, ubiquitous API that is cancelable, retryable, etc.

function getBalance(name: string) {
  backend.getAccountIdByName(name).pipe(
    switchMap(id => backend.getBalanceById(id)
  );
}

Observables are a uniform, ubiquitous API that is cancelable, retryable, etc.

Summary

An Observable going through a series of transformation operators from source to final result is:

  1. Cancelable through-and-through; cancelling a subscription to a resultant Observable will cancel any underlying subscriptions opened to that end.
  2. Composable in its own right; and
  3. A ubiquitous immutable API that gives callers flexibility in manipulating return values.

I propose side-effects being a great first-order heuristic as far as what can reasonably be kept within a composed Observable. When needed, operators like do and tap will sometimes make sense.

Data and Page Content Refresh patterns in Angular

Part of why I recommend using RxJS Observables all the way through in Angular TypeScript code, and only unpacking them at the closest point to where the UI is declared (often using the | async pipe), is because it makes other transformations on an Observable available and convenient. Two such examples include retry and refresh logic.

Two common reasons to reload/refresh data being displayed by a component include:

  1. A user action in the application causes the data to change (especially, if it does so in ways that might result in complex state changes, etc.), and/or
  2. The data being displayed can change over time (e.g. due to a progression/change of it’s state in the backend, or by another user, etc.)

Let’s start with a simple example:

@Component({
  selector: 'task',
  template: `<ng-container *ngIf="(task$ | async) as task">
    <h1>{{task.name}}</h1>
    <p>Status: {{task.status}}</p>
    <sub-task *ngFor="let subTask of task.subtasks" [subTask]="subTask"/>`
})
export class TaskComponent {
  constructor(private readonly http: HttpClient) {}
  readonly task$ = this.http.get('/api/tasks/foo');
}

Suppose the user adds a ‘Mark as Complete’ button, that mutates the server-side state of all sub-tasks. How do obtain the latest authoritative data from the server about the state of our side? Here’s an approach:

export class TaskComponent {
  constructor(private readonly http: HttpClient) {}
  
  private readonly refreshToken$ = new BehaviorSubject(undefined);
  private readonly task$ = this.refreshToken$.pipe(
    switchMap(() => this.http.get('/api/tasks/foo')));

  markAsComplete() {
    this.http.post('/api/tasks/foo', { state: State.Done })
      // N.B. contrary to my advice elsewhere, I'm happy to
      // directly subscribe here because this subscribe
      // callback has side effects.
      // Further, I don't worry about unsubscribing since
      // this returned Observable is a one-shot observable
      // that will complete after a single request.
      .subscribe(() => this.refreshToken$.next(undefined));
  }
}

Adding refresh logic this way will minimally affect our template code and looks relatively clean. Better yet, adding additional mutating functions simply need to call refreshToken$.next to make sure new data is loaded.

What about regularly polling for updates? This can be implemented simply as well:

export class TaskComponent {
  constructor(private readonly http: HttpClient) {}
  
  private readonly autoRefresh$ =
    interval(TASK_REFRESH_INTERVAL_MS)
      .pipe(startWith(0));

  private readonly refreshToken$ = new BehaviorSubject(undefined);

  private readonly task$ =
    // Notice that combineLatest will only trigger the first
    // time when an event triggers on all input Observables
    // you are combining.
    // BehaviorSubject always triggers its latest value when
    // you subscribe to it, so we're good there.
    // An interval() Observable will need a 'startWith' to
    // give you an initial event.
    combineLatest(this.autoRefresh$, this.refreshToken$)
    .pipe(
      switchMap(() => this.http.get('/api/tasks/foo'))
    );

  markAsComplete() {
    this.http.post('/api/tasks/foo', { state: State.Done })
      .subscribe(() => this.refreshToken$.next(undefined));
  }
}

What if we didn’t want to hardcode foo as the task we look up? Well, Angular’s ActivatedRoute already uses Observables. Rather than using route.snapshot.params['task_id'] or similar, we can use the actual Observable results and get our minds off manually refreshing that data:

export class TaskComponent {
  constructor(private readonly http: HttpClient,
              private readonly route: ActivatedRoute) {}
  
  private readonly autoRefresh$ =
    interval(TASK_REFRESH_INTERVAL_MS)
      .pipe(startWith(0));

  private readonly refreshToken$ = new BehaviorSubject(undefined);

  private readonly task$ =
    combineLatest(this.route.params,
                  this.autoRefresh$,
                  this.refreshToken$)
    .pipe(
      switchMap(([params]) =>
        this.http.get(`/api/tasks/${params['task_id']}`))
    );

  markAsComplete() {
    this.route.params.pipe(
      map(([params]) => params['task_id']),
      switchMap(taskId => 
        this.http.post(`/api/tasks/${taskId}`, { state: State.Done })
      ))
    .subscribe(() => this.refreshToken$.next(undefined));
  }
}

As a monad, an Observable is a neat and tidy functional construct. You can transform it using a rich set of operators. In RxJS, those also include catchError for error handling and retrying, timed events, and combinations of multiple monads into a monad of multiple items. With the view of Observables as just another monad, reactive programming becomes just a simple extension on top of functional programming.

Dealing with these Observables for as much of the data lifecycle as possible means that you can take advantage of these constructs to transform immutable data using neat operators, rather than dealing with unpacking this data into mutable scalars.

Use AsyncPipe When Possible

I typically review a fair amount of Angular code at work. One thing I typically encourage is using plain Observables in an Angular Component, and using AsyncPipe (foo | async) from the template html to handle subscription, rather than directly subscribing to an observable in a component TS file.

Subscribing in Components

Unless you know a subscription you’re starting in a component is very finite (e.g. an HTTP request with no retry logic, etc), subscriptions you make in a Component must:

  1. Be closed, stopped, or cancelled when exiting a component (e.g. when navigating away from a page),
  2. Only be opened (subscribed) when a component is actually loaded/visible (i.e. in ngOnInit rather than in a constructor).

Consider:

@Component()
 export class Foo
   implements OnInit, OnDestroy {

   someStringToDisplay = '';
   private readonly onDestroy =
     new ReplaySubject<void>(1);

   ngOnInit() {
     someObservable.pipe(
       takeUntil(this.onDestroy),
       map( ... ),
     ).subscribe(next => {
       this.someStringToDisplay = next;
       this.ref.markForCheck();
     });
   }

   ngOnDestroy() {
     this.onDestroy.next(undefined);
   }
 }
@Component()
 export class Foo
   implements OnInit, OnDestroy {

   someStringToDisplay = '';
   private subscription =
     Subscription.EMPTY;

   ngOnInit() {
     this.subscription = someObservable.pipe(
       map( ... ),
     ).subscribe(next => {
       this.someStringToDisplay = next;
       this.ref.markForCheck();
     });
   }

   ngOnDestroy() {
     this.subscription.unsubscribe();
   }
 }
<span>{{someStringToDisplay}}</span>

AsyncPipe can take care of that for you

@Component() export class Foo {
   someStringToDisplay = someObservable.pipe(
     map(...),
   );
 }
<span>{{someStringToDisplay | async}}</span>

Much better! No need to remember to manage unsubscribe. No need to implement OnDestroyAsyncPipe does its own unsubscribe on destruction, etc. If you only implement OnInit to make a new subscription, you can forego that too.

Best Practice: Use publishReplay and refCount if accessing the same Observable from multiple places

If you need to access a value multiple times, consider using the publishReplay and refCount RxJS operators:

readonly pageTitle = this.route.params.pipe(
   map(params => params['id']),
   flatMap(id => this.http.get(
     `api/pages/${id}/title`,
     {'responseType': 'text'})),
   publishReplay(1),
   refCount()
);
<h1>{{pageTitle | async}}</h1> 
<p>You are viewing {{pageTitle | async}}.</p>

This will cause the template rendering to make a single request for pageTitle, and cache the result between both uses.

Best Practice: Combine *ngIf, as, and else with AsyncPipe

If you need to handle the loading state, and need to display nested properties of an object returned from an observable, you can do something like:

<ng-container *ngIf="(pageObservable | async) as page; else loading">
   <!-- can refer to 'page' here -->
   <h1>{{page.title}}</h1>
   <p>{{page.paragraph}}</p>
 </ng-container>
 <ng-template #loading>Loading…</ng-template>

Note that this doesn’t distinguish between the case where pageObservable is still loading, and the case where pageObservable resolved to a falsey value.

What, other than hypocrisy?

Close to half a million Syrians have died in the Syrian Civil war since 2011. In Aleppo alone, since 2012, over 100,000 Syrians have been killed. As of 2015, the UN puts the estimate of civilians killed by the Syrian regime at 250,000. Other estimates range from 150,960 to 470,000.[1]

More Arabs have been killed by Bashar Al-Assad since 2011 than by Israel since 1948.

More Arabs have been killed by Bashar Al-Assad since 2011 than by the US Iraqi Invasion (by most estimates).

More Arabs have been killed by Bashar Al-Assad since 2011 than by ISIS.[2]

War is ugly. We learn daily about atrocities committed by all players in the region. Certainly many of them by the secular rebels like the Free Syrian Army. Many of them by the U.S.-led coalition.

We all need to be ashamed. No one is on the “good” side here.

The regime deserves a special place, though.

What, other than hypocrisy allows one to protest Israel’s occupation yet excuse the regime? What, other than hypocrisy allows one to protest US Imperialism, yet excusing Russian and Iranian imperialism?

Perhaps it is denialism. We live in a world where basic facts are contested. Where there is no common truth. Perhaps it is not hypocrisy. Perhaps we each have our own realities, with their own numbers, and each reality continues to fuel our fight against our neighbors.


[1] Totals for war via SCPR and SOHR. Estimate for those killed by Syrian Regime via Quora answer, see answer for cited UNHCR reports.
[2] Estimates are in the mid-tens of thousands

A fifteen year-old lesson from New York

Reading Gloria Steinem’s My Life on the Road gave me many treasured lessons. Many are relevant on election years. One is especially relevant on the heels of the 15th anniversary of the 9/11 terrorist attacks, and particularly during an election year where fear and Islamophobia are on the ballot.
As a large portion of Americans react to fear of terrorism and religious-inspired radicalism with Islamophobia and an anti-immigrant mentality, I remember this.

Steinem recalls a conversation with a cab driver in New York city “only ten days after the 9/11 terrorist attacks”. “Downtown streets were covered with surrealistic gray ash and debris,” she says, “and gutters were filled with the bodies of birds that had been incinerated in flight.”
But then she rides in this man’s cab.

My driver was a quiet young white guy with a gravity that I sensed as soon as I got into his cab. We drove past construction fences covered with photos and notices posted by people who were still searching for missing relatives or friends or coworkers. There were also anonymous graffiti that had appeared as if by contagion all over New York with the same message: Our grief is not a cry of war.

“That’s how New Yorkers feel,” the driver said. “They know what bombing looks like, and they know the hell it is. But outside New York, people will feel guilty because they weren’t here. They’ll be yelling for revenge out of guilt and ignorance. Sure, we all want to catch the criminals, but only people who weren’t in New York will want to bomb another country and repeat what happened here.”

“He was right,” she says, “Even before it was clear that Iraq and Saddam Hussein had nothing to do with 9/11 […] 75 percent of New Yorkers opposed the U.S. bombing of Iraq. But a national majority supported it.”

Former New York Mayor Rudy Giuliani made a (widely misreported) claim that no major terror attack struck the U.S. after the passage of the PATRIOT Act. Giuliani is making the implication that the empowerment of the surveillance state, especially on Muslims, was related to a failure of major terror attacks to take place after 9/11 on U.S. soil. Perhaps it was related (there is mixed evidence on whether the PATRIOT act worked). Also related, perhaps, is George W. Bush’s refusal to empower Islamophobic rhetoric, Instead stating, less than a week after 9/11, “Islam is Peace.” Perhaps, I add, that a compassionate, understanding state is more equipped to handle terror threats, to empower communities to self-monitor and self-report, to empower communities to teach, affirm, and re-affirm peace, when it refuses to paint these communities with a broad brush.

Excerpts in this post are taken from Gloria Steinem’s My Life on the Road, pp. 72–3, Chapter III. The chapter is entitled “Why I Don’t Drive”. It describes insightful and perspective-changing experiences Steinem had by talking to cab drivers and public transit passengers over years of traveling around the world.