Learning by Implementing: Observables

Sometimes, the best way to learn a new concept is to try to implement it. With my journey with reactive programming, my attempts at implementing Observables were key to to my ability to intuit how to best use them. In this post, we’ll be trying various strategies of implementing an Observable and see if we can make get to working solution.

I’ll be using TypeScript and working to implement something similar to RxJS in these examples, but the intuition should be broadly applicable.

First thing’s first, though: what are we trying to implement? My favorite way or motivating Observables is by analogy. If you have some type, T, you might represent it in asynchronous programming as Future<T> or Promise<T>. Just as futures and promises are the asynchronous analog of a plain type, an Observable<T> is the asynchronous construct representing as collection of T.

The basic API for Observable is a subscribe method that takes as bunch of callbacks, each triggering on a certain event:

interface ObservableLike<T> {
  subscribe(
      onNext?: (item: T) => void,
      onError?: (error: unknown) => void,
      onDone?: () => void): Subscription;
}

interface Subscription {
  unsubscribe(): void;
}

With that, let’s get to work!

First Attempt: Mutable Observables

One way of implementing an Observable is to make sure it keeps tracks of it’s subscribers (in an array) and have the object send events to listeners as they happen.

For the purpose of this and other implementations, we’ll define an internal representation of a Subscription as follows:

interface SubscriptionInternal<T> {
  onNext?: (item: T) => void;
  onError?: (error: unknown) => void;
  onDone?: () => void;
}

Therefore, we could define an Observable as such:

class Observable<T> implements ObservableLike<T> {
  private readonly subscribers: Array<SubscriptionInternal<T>> = [];

  triggerNext(item: T) {
    this.subscribers.forEach(sub => sub.onNext && sub.onNext(item));
  }

  triggerError(err: unknown) {
    this.subscribers.forEach(sub => sub.onError && sub.onError(err));
  }

  triggerDone() {
    this.subscribers.forEach(sub => sub.onDone && sub.onDone());
    this.subscribers.splice(0, this.subscribers.length);
  }

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    const subInternal: SubscriptionInternal<T> = {
      onNext,
      onError,
      onDone
    };

    this.subscribers.push(subInternal);
    return {
      unsubscribe: () => {
        const index = this.subscribers.indexOf(subInternal);
        if (index !== -1) {
          onDone && onDone(); // Maybe???
          this.subscribers.splice(index, 1);
        }
      }
    };
  }
}

This would be used as follows:

// Someone creates an observable:
const obs = new Observable<number>();
obs.triggerNext(5);
obs.triggerDone();

// Someone uses an observable
obs.subscribe(next => alert(`I got ${next}`), undefined, () => alert("done"));

There are a few fundamental problems going on here:

  1. The implementer doesn’t know when subscribers will start listening, and thus won’t know if triggering an event will be heard by no one,
  2. Related to the above, this implementation always creates hot observables; the Observable can start triggering events immediately after creation, depending on the creator, and
  3. Mutable: Anyone who receives the Observable can call triggerNext, triggerError, and triggerDone on it, which would interfere with everyone else.

There are some limitations of the current implementation: can error multiple times, a “done” Observable can trigger again, and an Observable can move back and forth between “done”, triggering, and “errored” states. But state tracking here wouldn’t be fundamentally more complicated. We also need to think more about errors in the callback, and what the effect of that should be on other subscribers.

Second Attempt: Hot Immutable Observables

Let’s first solve the mutability problem. One approach is to pass a ReadonlyObservable interface around which hides the mutating methods. But any downstream user up-casting the Observable could wreck havoc, never mind plain JS users who just see these methods on an object.

A cleaner approach in JavaScript is to borrow from the Promise constructor’s executor pattern, where the constructor is must be passed a user-defined function that defines when an Observable triggers:

class Observable<T> implements ObservableLike<T> {
  private readonly subscribers: Array<SubscriptionInternal<T>> = [];

    constructor(
      executor: (
        next: (item: T) => void,
        error: (err: unknown) => void,
        done: () => void
      ) => void
    ) {
      const next = (item: T) => {
        this.subscribers.forEach(sub => sub.onNext && sub.onNext(item));
      };
    
      const error = (err: unknown) => {
        this.subscribers.forEach(sub => sub.onError && sub.onError(err));
      };
    
      const done = () => {
        this.subscribers.forEach(sub => sub.onDone && sub.onDone());
        this.subscribers.splice(0, this.subscribers.length);
      };
    
      executor(next, error, done);
    }

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    const subInternal: SubscriptionInternal<T> = {
      onNext,
      onError,
      onDone
    };

    this.subscribers.push(subInternal);
    return {
      unsubscribe: () => {
        const index = this.subscribers.indexOf(subInternal);
        if (index !== -1) {
          onDone && onDone(); // Maybe???
          this.subscribers.splice(index, 1);
        }
      }
    };
  }
}

Much better! We can use this as such:

// Someone creates an observable:
const obs = new Observable<number>((next, error, done) => {
  next(5);
  done();
});

// Someone uses an observable
obs.subscribe(next => alert(`I got ${next}`), undefined, () => alert("done"));

This cleans up the API quite a bit. But in this example, calling this code in this order will still cause the subscriber to see no events.

Good Examples

We can already use this type of code to create helpful Observables:

// Create an Observable of a specific event in the DOM.
function fromEvent<K extends keyof HTMLElementEventMap>(
  element: HTMLElement,
  event: K
): Observable<HTMLElementEventMap[K]> {
  return new Observable<HTMLElementEventMap[K]>((next, error, done) => {
    element.addEventListener(event, next);
    // Never Done.
  });
}

const clicks: Observable<MouseEvent> = fromEvent(document.body, "click");

Or an event stream from a timed counter:

function timer(millis: number): Observable<number> {
  return new Observable<number>((next, error, done) => {
    let count = 0;
    setInterval(() => {
      next(count);
      ++count;
    }, millis);
  });
}

Even these examples have some issues: they keep running even when no one is listening. That’s sometimes fine, if we know we’ll only have one Observable, or we’re sure callers are listening and so tracking that state is unnecessary overhead, but it’s starting to point to certain smells.

Bad Examples

One common Observable factory is of, which create an Observable that emits one item. The assumption being that:

const obs: Observable<number> = of(42);
obs.subscribe(next => alert(`The answer is ${next}`)); 

… would work, and result in “The answer is 42” being alerted. But a naive implementation, such as:

function of<T>(item: T): Observable<T> {
  return new Observable<T>((next, error, done) => {
    next(item);
    done();
  };
}

… would result in the event happening before anyone has the chance to subscribe. Tricks like setTimeout work for code that subscribes immediately after, but is fundamentally broken if we want to generalize this to someone who subscribes at a later point.

The case for Cold Observables

We can try to make our Observables lazy, meaning they only start acting on the world once subscribed to. Note that by lazy I don’t just mean that a shared Observable will only start triggering once someone subscribes to it — I mean something stronger: an Observable will trigger for each subscriber.

For example, we’d like this to work properly:

const obs: Observable<number> = of(42);
obs.subscribe(next => alert(`The answer is ${next}`));
obs.subscribe(next => alert(`The second answer is ${next}`)); 
setTimeout(() => {
  obs.subscribe(next => alert(`The third answer is ${next}`)); 
}, 1000);

Where we get 3 alert messages the contents of the event.

Third Attempt: Cold Observables (v1)

type UnsubscribeCallback = (() => void) | void;

class Observable<T> implements ObservableLike<T> {
  constructor(
    private readonly executor: (
      next: (item: T) => void,
      error: (err: unknown) => void,
      done: () => void
    ) => UnsubscribeCallback
  ) {}

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    const noop = () => {};
    const unsubscribe = this.executor(
      onNext || noop,
      onError || noop,
      onDone || noop
    );

    return {
      unsubscribe: unsubscribe || noop
    };
  }
}

In this attempt, each Subscription will run the executor separately, triggering onNext, onError, and onDone for each subscriber as needed. This is pretty cool! The naive implementation of of works just fine. I also snuck in a pretty simple method to allow us to add cleanup logic to our executors.

fromEvent would benefit from that, for example:

// Create an Observable of a specific event in the DOM.
function fromEvent<K extends keyof HTMLElementEventMap>(
  element: HTMLElement,
  event: K
): Observable<HTMLElementEventMap[K]> {
  return new Observable<HTMLElementEventMap[K]>((next, error, done) => {
    element.addEventListener(event, next);
    // Never Done.

    return () => {
      element.removeEventListener(event, next);
    };
  });
}

The nice thing about this is that we remove our listeners when a particular subscriber unsubscribes. Except now, we open as many listeners as subscribers. That might be okay for this one case, but we’ll want to figure out how to let users “multicast” (reuse underlying events, etc.) when they want to.

We still haven’t figured out error handling and proper cleanup and error handling. For example:

  1. It is generally regarded that a subscription that errors is closed (just like how throwing an error while iterating over a for loop will terminate that loop)
  2. When a subscriber unsubscribes, we should probably get that “onDone” event.
  3. When there’s an error, we should probably do some cleanup.

Better Cold Observables

Here’s a re-implementation of subscribe that might satisfy these conditions:

class Observable<T> implements ObservableLike<T> {
  constructor(
    private readonly executor: (
      next: (item: T) => void,
      error: (err: unknown) => void,
      done: () => void
    ) => UnsubscribeCallback
  ) {}

  subscribe(
    onNext?: (item: T) => void,
    onError?: (error: unknown) => void,
    onDone?: () => void
  ): Subscription {
    let dispose: UnsubscribeCallback;
    let running = true;
    const unsubscribe = () => {
      // Do not allow retriggering:
      onNext = onError = undefined;

      onDone && onDone();
      // Don't notify someone of "done" again if we unsubscribe.
      onDone = undefined;

      if (dispose) {
        dispose();
        // Don't dispose twice if we unsubscribe.
        dispose = undefined;
      }

      running = false;
    };

    const error = (err: unknown) => {
      onError && onError(err);
      unsubscribe();
    };

    const done = () => {
      unsubscribe();
    };

    const next = (item: T) => {
      try {
        onNext && onNext(item);
      } catch (e) {
        onError && onError(e);
        error(e);
      }
    };

    dispose = this.executor(next, error, done);

    // We just assigned dispose. If the executor itself already
    // triggered done() or fail(), then unsubscribe() has gotten called
    // before assigning dispose.
    // To guard against those cases, call dispose again in that case.
    if (!running) {
      dispose && dispose();
    }

    return {
      unsubscribe: () => unsubscribe()
    };
  }
}

Using Observables

Taking the “Better Cold Observables” example, let’s see how we can use Observables:

Useful Factories

We already discussed fromEvent and of, which work with the new form of Observable. A few others we can create:

// Throws an error immediately
function throwError(err: unknown): Observable<never> {
  return new Observable((next, error) => {
    error(err);
  });
}

// Combines two Observables into one.
function zip<T1, T2>(
  o1: Observable<T1>,
  o2: Observable<T2>
): Observable<[T1, T2]> {
  return new Observable<[T1, T2]>((next, error, done) => {
    const last1: T1[] = [];
    const last2: T2[] = [];

    const sub1 = o1.subscribe(
      item => {
        last1.push(item);
        if (last1.length > 0 && last2.length > 0) {
          next([last1.shift(), last2.shift()]);
        }
      },
      err => error(err),
      () => done()
    );

    const sub2 = o2.subscribe(
      item => {
        last2.push(item);
        if (last2.length > 0 && last1.length > 0) {
          next([last1.shift(), last2.shift()]);
        }
      },
      err => error(err),
      () => done()
    );

    return () => {
      sub1.unsubscribe();
      sub2.unsubscribe();
    };
  });
}

Useful Operators

Another nice thing about Observables is that they’re nicely composable. Take map for instance:

function map<T, R>(observable: Observable<T>, mapper: (item: T) => R) {
  return new Observable<R>((next, fail, done) => {
    const sub = observable.subscribe(item => next(mapper(item)), fail, done);
    return () => {
      sub.unsubscribe();
    }
  });
}

This allows us to do things like:

function doubled(input: Observable<number>): Observable<number> {
  return map(input, n => n * 2);
}

Or we could define filter:

function filter<T>(observable: Observable<T>, predicate: (item: T) => boolean) {
  return new Observable<T>((next, fail, done) => {
    const sub = observable.subscribe(
      item => {
        if (predicate(item)) next(item);
      },
      fail,
      done
    );
    return () => {
      sub.unsubscribe();
    };
  });
}

Which allows us to do:

function primeOnly(input: Observable<number>): Observable<number> {
  return filter(input, isPrime);
}

Conclusion

I didn’t really try to sell you, dear reader, on why you should use Observables as helpful tools in your repertoire. Some of my other writing showing their use cases (here, here, and here) might be helpful. But really, what I wanted to demonstrate is some of the intuition on how Observables work. The implementation I shared isn’t a complete one, for that, you better consult with Observable.ts in the RxJS implementation. This implementation is notably missing a few things:

  • We could still do much better on error handling (especially in my operators)
  • RxJS observables include the pipe() method, which makes applying one or more of those operators to transform an Observable much more ergonomic
  • Lot’s of things here and there

Schema.org Classes in TypeScript: Properties and Special Cases

In our quest to model Schema.org classes in TypeScript, we’ve so far managed to model the type hierarchy, scalar DataType values, and enums. The big piece that remains, however, is representing what’s actually inside of the class: it’s properties.

After all, what it means for a JSON-LD literal to have "@type" equal to "Person" is that certain properties — e.g. "birthPlace" or "birthDate", among others — can be expected to be present on the literal. More than their potential presence, Schema.org defines a meaning for these properties, and the range of types their values could hold.

The easy case: Simple Properties

You can download the entire vocabulary specification of Schema.org, most of which describes properties on these classes. For each property, Schema.org will tell us it’s domain (what classes have this property) and range (what types can its values be). For example, the name property specification shows that it is available on the class Thing, and has type Text. One might represent this knowledge as follows:

interface ThingBase {
  "name": Text;
}

Linked Data, it turns out, is a bit richer than that, allowing us to express situations where a property has multiple values. In JSON-LD, this is represented by an array as the value of the property. Therefore:

interface ThingBase {
  "name": Text | Text[];
}

Multiple Property Types

Often times, however, the range of a particular property is any one of a number of types. For example, the property image on Thing can be an ImageObject or URL. Note, also, that nothing in JSON-LD necessitates that all potential values of image have the same type.

In other words, if we want to represent image on ThingBase, we have:

interface ThingBase {
  "name": Text | Text[];
  "image": ImageObject | URL | (ImageObject | URL)[];
}

Properties are Optional

In JSON-LD, all properties are optional. In practice Schema.org cares about "@type" being defined for all classes, but does not otherwise define any other properties as being required. This is sometimes complicated as specific search engines require some set of properties on a class.

interface ThingBase {
  "name"?: Text | Text[];
  "image"?: ImageObject | URL | (ImageObject | URL)[];
}

Properties Can Supersede Others in the Vocabulary

As Schema.org matures, it’s vocabulary changes. Not all of these changes will be additive (adding a new type, or a new type on an existing property). Some will involve adding a new type or property intended to replace another.

For example, area was a property on BroadcastService describing a Place the service applies to. Turns out, a lot of other businesses also apply to a specific area. serviceArea replaced area, and instead of applying to BroadcastService, it applied to its parent, Service. In addition, serviceArea can also apply to Organization and ContactPoint (something area never did). In addition to being just a Place, serviceArea can be an AdministrativeArea or an arbitrary GeoShape.

Later on, serviceArea was replaced by areaServed, which also included a freeform Text as a possible value, and applied to a few more objects.

When a property replaces another, it supersedes it (inversely, the other property is superseded by the new one). These changes keep existing Schema.org JSON-LD backwards-compatible. A property p2 superseding p1 will generally imply:

  1. p2 is available on all types p1 was available on. (p2‘s domain is strictly wider).
    This includes (a) additional types in the domain, or (b) the domain changing to a parent class, for example.
  2. p2 includes all possible types of p1 (p2‘s range is strictly wider).

Typically, new data will be written with p2, but the intention is that any old data written using p1 continues to be valid.

In TypeScript, we can use the @deprecated JSDoc annotation to recommend using a new property instead. We can go further and simply skip all deprecated properties (properties that are superseded by one or more properties) if we wanted to.

The story of area, serviceArea, and areaServed can be partially summarized as follows:

interface BroadcastServiceBase extends OrganizationBase {
  /** @deprecated Superseded by serviceArea */
  "area"?: Place | Place[];
}

interface OrganizationBase {
  /** @deprecated Superseded by areaServed *
  "serviceArea"?: AdministrativeArea | GeoShape | Place |
                  (AdministrativeArea | GeoShape | Place)[];

  "areaServed"?: AdministrativeArea | GeoShape | Place | Text |
                 (AdministrativeArea | GeoShape | Place | Text)[];
}

Things Fall Apart

Multiple Types

"@type" is just another property (albeit it has speical meaning).

JSON-LD permits a node to have multiple "@type"s as well, and search engines are happy to accept multiple types (at least for some nodes). In practice, a node having two types means that it can have properties on both types. For example, this is valid:

{
  "@type": ["Organization", "Person"],
  "birthDate": "1980-01-01",
  "foundingDate": "2000-01-01"
}

In TypeScript, discriminating a union on an array seems to be hard, and it becomes a bit clunky to define. For now, our TypeScript definitions will not allow multiple @type values.

Sub-Properties

Schema.org takes advantage of the RDF concept of a sub-property:

If a property P is a subproperty of property P’, then all pairs of resources which are related by P are also related by P’

RDF Schema 1.1

Simply put, a sub-property is a more specific version of a property.

For example, image exists on Thing, but has two sub-properties: logo, which exists on Brand, Organization, and a few other types, and photo, which exists on a Place.

One thing I expected is not to be able to specify a super-property on a node whose type has the sub-property available. I.e., if I’m describing a Brand, it’s logo will sufficiently describe image, thereby serving no meaning to specify image.

That’s not quite true, though, a sub-property implying a property still leaves room for the property itself to be available (an Organization can have multiple images, one of which is its logo).

And while that should be true (by the RDF specification), turns even that isn’t true in Schema.org. Some sub-properties have more general types than their super-properties, e.g. photo can be a Photograph, but it’s super-property, image cannot.

So here, we simply punt.

Special Cases

Reading Schema.org documentation, you might expect as I did that there are two distinct hierarchies of data: Thing (aka classes/node types) and DataType (aka values/scalars/primitives). That’s definitely not true in JSON-LD in general, where many values are untyped to begin with, specified using an "@id" reference, or a string. Schema.org implies it imposes a tighter requirement, and describes these hierarchies dis-jointly, but that turns out not to be true.

Turns out, some types, like Distance are in the Thing hierarchy, but expect string values (in the case of Distance, those would take the form "5 in" or "2.3 cm", etc.).

We might consider having our typings include string (or Text?) for all of our classes. To encourage semantically specifying properties, however, I decided to only allow string on a subset of our nodes.

type Distance = DistanceLeaf | string;

Conclusion

Schema.org is a vocabulary designed in an inherently human way. This sometimes have repercussions of being thoughtful. Yet, just as often, it means that the semantics have evolved in a way that is inconsistent. The result is often dissatisfying: relations that are defined but don’t hold in practice, objects that are described with textual comments but have no formal relations specifying them, distances that are described as nodes, and many others. These inconsistencies often lead to hacks when trying to represent the vocabulary in TypeScript.

Yet, it’s important not to lose track of why modeling Schema.org in TypeScript to begin with. The lack of tooling around Schema.org (specifically in IDEs when writing out a specific piece of data), is precisely the need we’re filling in. But ultimately, adding structure to an ontology that is largely decided by a loose set of guidelines will be lossy.

The question remains: is the trade-off worth it?

For my purposes, schema-dts has helped me tremendously over the past several months.

Schema.org DataType in TypeScript: Structural Typing Doesn’t Cut It

Schema.org has a concept of a DataType, things like Text, Number, Date, etc. In JSON-LD, we represent these as strings or numbers, rather than array or object literals. This data could describe the name of a Person, a check-in date and time for a LodgingReservation, a URL of a Corporation, publication date of an Article, etc. As we’ll see, the Schema.org DataType hierarchy is far richer than TypeScript’s type system can accommodate. In this article, we’ll go over the DataType hierarchy and explore how much type checking we can provide.


We saw in the first installment how TypeScript’s type system makes expressing JSON-LD describing Schema.org class structure very elegant. The story got slightly more clouded when we introduced Schema.org Enumerations.

Schema.org Data Types

Let’s take a look at the full DataType tree according Schema.org:

Boolean’s look quite similar to enums, with http://schema.org/True and http://schema.org/False as it’s two possible IRI values (depending on @context, those can of course be represented as relative IRIs instead) or their HTTPS equivalents.

Number and descendants are just JSON / JavaScript numbers. Float indicates the JSON number will have a floating point precision, whereas Integer tells us to expect a whole number. On its own right, JavaScript does not distinguish floats and integers as separate types, and neither does TypeScript. While TypeScript supports the idea of literal types, specifying a type as all possible integers or all possible floating point numbers isn’t expressible.

Continue reading “Schema.org DataType in TypeScript: Structural Typing Doesn’t Cut It”

Schema.org Enumerations in TypeScript

Last time, we talked about modeling the Schema.org class hierarchy in TypeScript. We ended up with an elegant, recursive solution that treats any type Thing as a "@type"-discriminated union of ThingLeaf and all the direct sub-classes of the type. The next challenge in the journey of building TypeScript typings for the Schema.org vocabulary is modeling Enumerations.

Learning from Examples

Let’s look at a few examples from the Schema.org website to get a better sense of what Enumerations look like.

First up, I looked at PaymentStatusType, which can take any one of these values: PaymentAutomaticallyApplied, PaymentComplete, PaymentDeclined, PaymentDue, or PaymentPastDue. PaymentStatusType is used in the paymentStatus property on the Invoice class.

Here’s an excerpt from an example of an invoice:

{
    "@context": "http://schema.org/",
    "@type": "Invoice",
    // ...
    "paymentStatus": "http://schema.org/PaymentComplete",
    "referencesOrder": [
      // ...
    ]
}

Here, the value of an Enumeration appears as an absolute IRI.

Looking at other examples, however, such as GamePlayMode which appears in playMode on VideoGame shows up differently:

Continue reading “Schema.org Enumerations in TypeScript”

Modeling Schema.org Schema with TypeScript: The Power and Limitations of the TypeScript Type System

Recently, I published schema-dts (npm, GitHub), an open source library that models JSON-LD Schema.org in TypeScript. A big reason I wanted to do this project is because I knew some TypeScript type system features, such as discriminated type unions, powerful type inference, nullability checking, and type intersections, present an opportunity to both model what Schema.org-conformant JSON-LD looks like, while also providing ergonomic completions to the developer.

In a series of posts, I’ll go over some of the Structured Data concepts that lent themselves well to TypeScript’s type system, and those concepts that didn’t. First up: the type hierarchy of JSON-LD Schema.org Schema, and how can be represented in TypeScript.

Note: I’ll be describing JSON-LD in general in very broad strokes and will spend more time discussing how Schema.org JSON-LD looks like in particular. For those who are familiar with the JSON-LD spec, you’ll see I took a few liberties. This is because schema-dts makes a few assumptions, such as the @context being a known constant, etc. schema-dts also foregoes some features, such as specifying multiple types of a node object, etc.

Modeling the Schema.org class structure with the TypeScript Type System

Schema.org JSON-LD node objects are always typed (that is, they have a @type property that points to some IRI–a string–describing it). Given a @type you know all the properties that are defined on a particular object. Object types inherit from each other. For example, Thing in Schema.org has a property called name, and Person is a subclass of Thing that defines additional properties such as birthDate, and inherits all the properties of Thing such as name. Thing has other sub-classes, like Organization, with it’s own properties, like logo.

Let’s use this minimal example to try a few approaches:

1. Modeling each with inheritance

interface Thing {
  "name": string;
}
interface Person extends Thing {
  "@type": "Person";
  "birthDate": string;
}
interface Organization extends
    Thing {
  "@type": "Organization";
  "logo": string;
}

If we had a const something: Thing , then we could assign it to a Thing, Person, or Organization. So that’s a start! But there are a few problems:

Continue reading “Modeling Schema.org Schema with TypeScript: The Power and Limitations of the TypeScript Type System”
%d bloggers like this: