On-Demand Shared GraphQL Subscriptions with RxJS

Ghislain Thau

GraphQL has native support for subscriptions as schema operations. GraphQL subscriptions allow to receive continuous updates, usually implemented using a persistent WebSocket connection or Server-Sent Events.

Ideally, we want subscriptions to be stateless and independent for every subscriber: when a subscriber subscribes to an entity, this subscription is independent of any other subscription and can be handled by any instance of our GraphQL server. This is the case when the entity subscribed to is “simple” and processing updates is trivial: for example, we receive events from a pubsub provider/message queue (e.g. Redis, Nats.io, etc.) in the form { type: update, entityId: xyz }. In this case, our subscription handler can simply subscribe to these events, filter on the requested entityId, and forward those events (maybe a bit processed) to the subscriber.

However, this ideal scenario is not always possible. We might have “complex” entities we want to subscribe to, e.g. views that are aggregates of data over several data sources. The view might be expensive to fetch and subscribing to the view’s content updates might involve subscribing to multiple events and performing expensive processing and/or additional fetching in order to compute the updates and send them to the subscriber.

In this case, if multiple subscribers subscribe to the same view, the events’ processing, additional fetching, and computing would have to be done for each subscriber, which will lead to performance issues as the number of subscribers increases.

In this article, we’ll show how we can improve this use case by sharing the subscriptions’ sources and also perform their side-effect processing only when there are subscribers.

The solution uses RxJS, a library for Reactive Functional Programming, it is based on the Observer pattern and provides a wide range of functions to deal with streams of events in a declarative way.

Mutualizing the Subscription’s Work

If the subscription is independent of the subscriber (e.g. the logged-in user) and only dependent upon the view, then we can mutualize the events’ listening, buffering, processing, additional fetching, and computing required to update the view. This work can be done once and the result delivered to each subscriber.

A Solution

A Simple Example to Illustrate the Solution

In the following lines, we’ll use a very simplified example in order to present the solution: a GraphQL subscription that produces sequential integers. Then we’ll apply the solution to a more complex real-life example.

The solution to the simple example is implemented in this repository.

Using RxJS Multicasting Capabilities

By default observables are lazy and cold, meaning their side-effect are run only when a subscriber subscribes to the observable. Furthermore, for every subscriber, the observable pipeline is executed.

In the following example, the http query will be executed twice: query$ is a cold observable, no side-effect happens until a subscriber subscribes to it. When a second subscriber subscribes, the observable side-effect is re-executed.

const query$ = httpClient.get('/path/to/some/resource')
query$.subscribe(console.log)
query$.subscribe(console.log)

This is in opposition to Promises which are eager and hot: in this example the http query is executed only once.

const queryPromise = fetch('/path/to/some/resource')
queryPromise.then(console.log)
queryPromise.then(console.log)

However, RxJS has multicasting capabilities to turn cold observables into hot observables. Using RxJS’s multicasting operators, we can create observable pipelines which are shared: their side-effect is triggered when the first subscriber subscribes to it and when more than one subscriber subscribes, the pipeline is executed only once and the emitted value is passed to all subscribers.

Let’s see an example:

import { interval } from 'rxjs'
 
const obs$ = interval(1000) // observable emitting 0, 1, 2, ... every second
obs$.subscribe(console.log) // prints 0, 1, 2, ...
 
setTimeout(() => obs$.subscribe(console.log), 3000) // after 3 seconds, prints 0, 1, 2, ...

However, using the share operator, the observable pipeline is executed once and the second subscriber (even subscribing later), gets the same (shared) result as the first subscriber.

const obs$ = interval(1000).pipe(share()) // observable emitting 0, 1, 2, ... every second
obs$.subscribe(console.log) // prints 0, 1, 2, ...
 
setTimeout(() => obs$.subscribe(console.log), 3000) // after 3 seconds, prints 3, 4, 5 ...

Sharing the Observables

In the simple example above, we have multiple subscribers subscribing to a shared observable, in the same context of execution. However, in GraphQL subscriptions, the 2nd (or nth) subscriber will have its own context of execution. If we want to reuse a shared observable, we have to store it.

Let’s create a module to manage the subscriptions’ shared observables:

// util function to get or create a publisher
// takes a "cache" of publisher and a factory function and returns a higher-order function to get or create from this cache
export const getOrCreatePublisher = (publishersMap, makePublisherFn) => entityId => {
  if (publishersMap.has(entityId)) {
    return publishersMap.get(entityId)!
  }
 
  const newPublisher = makePublisherFn(entityId)
  publishersMap.set(entityId, newPublisher)
  return newPublisher
}
// assuming that the entityId is a number, and the shared observable emits numbers
const publishersMap = new Map<number, Observable<number>>()
 
const makePublisher = (entityId: number): Observable<number> => {
  return interval(1000).pipe(share())
}
 
export const getPublisher = getOrCreatePublisher(publishersMap, makePublisher)

Now from our GraphQL’s subscription resolver, we can use it like the following:

import { createSchema } from 'graphql-yoga'
import { eachValueFrom } from 'rxjs-for-await'
 
export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      giveMeInts: Int!
    }
  `,
  resolvers: {
    Subscription: {
      giveMeInts: {
        subscribe(root, args, context) {
          return eachValueFrom(getPublisher(args.entityId))
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})

With this code, when a subscriber subscribes, it will create the shared observable (publisher), store it in a cache so that it can be reused by subsequent subscribers, and subscribe to it. When a second subscriber subscribes, it will get the existing publisher from the cache and subscribe to it, and it will get the same value as the first subscriber gets. When all subscribers have unsubscribed, the source observable is unsubscribed. If later, a new subscriber subscribes, the source observable will be resubscribed and its pipeline reexecuted (in our case, it will start emitting from 0 again).

The graphql-js engine expects the subscribe handler to return an AsyncIterable, so we use the rxjs-for-await package from Ben Lesh which contains different functions from transforming an Observable into an AsyncIterable (there are different functions to deal with backpressure and lossiness/losslessness).

So far, so good. But we have a memory leak: after all subscribers have unsubscribed, we keep the publishers in the cache. We can clean those up using the finalize operator: this operator allows running a callback on observable termination (either the observable is complete or has no more subscribers).

Let’s modify our publisher factory function:

import { interval } from 'rxjs'
import { finalize, share } from 'rxjs/operators'
 
const makePublisher = (entityId: number): Observable<number> => {
  return interval(1000).pipe(
    // clean up publisher cache when no more subscribers to this publisher
    finalize(() => publishersMap.delete(entityId)),
    share()
  )
}

Delay the Shared Observable Cleanup

Because the view might be expensive to fetch and update, we only do so when users are subscribing to it. In the previous example, as soon as all users have unsubscribed, the shared observable is unsubscribed and deleted from the cache. This might not be the desired behavior, e.g.:

  • we can have one user subscribing to the view, this user refreshes his browser’s page, effectively ending the subscription, and triggering a cache cleanup, but then the view is resubscribed almost immediately, triggering a new fresh fetch
  • we can have one user stopping the subscription (e.g. navigating away from this view to another part of our app), but another user subscribing soon (a few milliseconds, seconds or minutes) after

To optimize for those use cases, we can delay the unsubscription of the shared observable when no more subscribers are subscribing to it, using the resetOnRefCountZero config of the share operator. This is available in RxJS v7.

const makePublisher = (entityId: number): Observable<number> => {
  return interval(1000).pipe(
    // clean up publisher cache when no more subscribers to this publisher
    finalize(() => publishersMap.delete(entityId)),
    share({
      // when no more subscribers, wait 10 seconds to unsubscribe from the source and reset the underlying subject
      resetOnRefCountZero: () => timer(10_000)
    })
  )
}

In the example above, after all subscribers have unsubscribed (refcount is 0) the connector (the subject that subscribes to the source observable) now waits for 10 seconds (10000 ms) before it resets the shared observable. During this period of time, the observable pipeline continues to execute. If a new subscriber subscribes before the timer has emitted, the refcount is again greater than 0 and the connector will keep connecting to the source observable.

⚠️

We use timer in this example but we can use any observable (e.g. receiving a specific event from our message queue) that emits at some point. Be careful though to make sure that the observable you use will actually emit otherwise you’ll have a memory leak. To remedy this issue, you might want to add safety, e.g. using raceWith with timer to make sure you don’t wait indefinitely:

share({
  // when no more subscribers, wait for 'my_event' up to 1 minute then unsubscribe from the source and reset the underlying subject
  resetOnRefCountZero: () => pubsub.events('my_event').pipe(
    raceWith(timer(60_000)),
  ),
}),

Reuse Shared Observable Sources across Multiple Subscriptions

With this solution, we can even go further: if a subscription depends on the processing of common events or on the source of another subscription, we can reuse the shared observable created for those.

Let’s make a new subscription that returns a sentence with the current number emitted by the giveMeInts subscription’s source:

export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      giveMeInts: Int!
      giveMeStringfiedInts: String!
    }
  `,
  resolvers: {
    Subscription: {
      /* giveMeInts subscription's resolver unchanged */
      giveMeStringfiedInts: {
        subscribe(root, args, context) {
          return eachValueFrom(getStringifiedPublisher(args.entityId))
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})

And the new publisher reusing the other subscription’s publisher:

import { map } from 'rxjs/operators'
 
// create a new cache for those publishers
const stringifiedPublishersMap = new Map<number, Observable<string>>()
 
export const getStringifiedPublisher = getOrCreatePublisher(
  stringifiedPublishersMap,
  makeStringifiedPublisher
)
 
function makeStringifiedPublisher(entityId: number): Observable<string> {
  // this new publisher gets or creates the source publisher from the other subscription
  return getPublisher(id).pipe(
    map(val => `The current number is ${val}`),
    // clean up publisher cache when no more subscribers to this publisher
    finalize(() => stringifiedPublishersMap.delete(entityId)),
    share({
      // when no more subscribers, wait 10 seconds to unsubscribe from the source and reset the underlying subject
      resetOnRefCountZero: () => timer(10_000)
    })
  )
}

Now, whether a subscription to giveMeStringfiedInts or giveMeInts starts first, both will be shared and in sync. This further mutualizes the processing of expensive computations. In the simple example showcased above, the advantage might be small, but in a real-world scenario, it could make a big difference: imagine one subscription sending the full structure of a expensive-to-compute view while a second subscription sends only a diff of the current structure with the previous one, the second subscription’s source depends on the source of the first. Mutualizing the view’s computations makes a difference regardless of which subscription has been triggered first.

Run Server-Side Logic When a User’s Subscription Is Closed

In GraphQL we implement subscriptions by writing two functions:

  • a subscribe function which is run one time only and must return an AsyncIterable
  • a resolve function which is run every time a value is emitted by the AsyncIterable returned by the subscribe function and transforms this value into the Subscription output type.

In order to execute logic, we have 2 places: in subscribe when the subscription starts and in resolve on each emitted value. What if we want to execute logic when the subscription ends (whether it was completed or was stopped by the client)?

The answer is: it is not contemplated in the spec. But it doesn’t mean we can’t do it. In fact, it is even very simple! If we use the solution described above, we already have all the pieces in place. We only need to add one line: in the subscribe function where we set up the AsyncIterable (by transforming the Observable into an AsyncIterable), we only have to use RxJS’s finalize operator.

import { createSchema } from 'graphql-yoga'
import { eachValueFrom } from 'rxjs-for-await'
import { finalize } from 'rxjs/operators'
 
export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      giveMeInts: Int!
    }
  `,
  resolvers: {
    Subscription: {
      giveMeInts: {
        subscribe(root, args, context) {
          return eachValueFrom(
            getPublisher(args.entityId).pipe(
              finalize(() => {
                // run logic here when subscription ends
                console.log(`Subscription giveMeInts has ended for user ${context.userId}`)
              })
            )
          )
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})

In the previous examples, we used the finalize operator on the shared observable pipeline in order to clean up after all subscriptions to a specific entity had ended. But we can also use it per subscription by piping it onto the shared observable: this way it is only run for when this specific subscription ends. And of course, like we just did, you can combine both!

Apply to a Real-Life Example

Let’s consider a view such as a complex “playlist”: this playlist is a list of items organized in blocks, the items themselves are a complex aggregate of several multimedia (video, audio, graphics) objects, those multimedia objects themselves might be aggregate (bundles) or have of some children objects (e.g. sub-clips, graphics on a video, subtitles). In effect, this complex playlist is organized in the form of a tree of objects.

The system emits different events:

  • some are related directly to the playlist itself (e.g. new item in the playlist, item removed from the playlist, item changed position inside playlist)
  • others are related to the parent multimedia objects (object deleted, object’s metadata changed, object has new children objects)
  • others are related to the children objects of the top level objects (object deleted, object’s metadata changed)

This playlist’s data is expensive to fetch, can be large, and is cached only when users subscribe to it: because it is expensive to compute updates, we want to ensure we don’t process events for all playlists in the system, only the ones some user is currently interested in.

Let’s reuse the techniques explained above.

Buffer Events

First, we’ll buffer the events, dedupe them and share the observable because those events will be checked against every playlist subscribed to, so we can share the buffering and deduplication:

// utils
export const isNotEmpty = <T>({ length }: ArrayLike<T>) => length > 0
import { from } from 'rxjs'
import { buffer, filter, share } from 'rxjs/operators'
 
const playlistEventNames = ['playlist_event_1', 'playlist_event_2', 'playlist_event_n'] as const
const objectsEventNames = ['objects_event_1', 'objects_event_2', 'objects_event_n'] as const
 
function bufferEvents(eventNames, bufferTime = 500) {
  return from(pubsub.events(eventNames)).pipe(buffer(bufferTime), filter(isNotEmpty), share())
}
 
const allPlaylistEvents$ = bufferEvents(playlistEventNames)
const allObjectsEvents$ = bufferEvents(objectsEventNames)

Create Shared Publishers for the Playlists’ Structure Changes

In this example, we’ll assume that the datasource is the source of truth, we’ll listen to events to detect potential changes in the playlist and we’ll force refetch (invalidate cache and fetch latest version) from the datasource (assuming calls are deduped using a Dataloader backed by a cache).

import { merge } from 'rxjs'
import { bufferTime, filter, finalize, map, share, switchMap } from 'rxjs/operators'
 
const playlistStructureMap = new Map<number, Observable<PlaylistStructure>>()
const playlistStructureDiffMap = new Map<number, Observable<PlaylistStructureDiff>>()
 
export const getPlaylistStructurePublisher = getOrCreatePublisher(
  playlistStructureMap,
  makePlaylistStructurePublisher
)
 
export const getPlaylistStructureDiffPublisher = getOrCreatePublisher(
  playlistStructureDiffMap,
  makePlaylistStructureDiffPublisher
)
 
function makePlaylistStructurePublisher(playlistId: number): Observable<PlaylistStructure> {
  const playlistEvents$ = allPlaylistEvents$.pipe(
    map(events => events.filter(ev => ev.playlistId === playlistId)),
    filter(isNotEmpty)
  )
 
  const playlistObjectsEvents$ = allObjectsEvents.pipe(
    // this function filter the events that can have an impact on the structure of the playlist
    // e.g. "object has a new child object"
    // and discard other events that have no impact on the structure
    // e.g. "object name has changed"
    map(events => filterObjectEventsThatAffectPlaylistStructure(events)),
    // only keeps events where the objectId refers to an object that belongs to this playlist's structure
    map(events => filterObjectsThatBelongToPlaylist(events, playlistId)),
    filter(isNotEmpty)
  )
 
  return merge(playlistEvents$, playlistObjectsEvents$).pipe(
    // buffer because both source observables might emit shortly one after another
    bufferTime(500),
    filter(isNotEmpty),
    // fetchPlaylistStructure is a function backed by a dataloader with cache
    // passing `true` as 2nd param to force fetch from datasource instead of cache
    switchMap(_ => fetchPlaylistStructure(playlistId, true)),
    // remove publisher from cache after it's been unsubscribed
    finalize(() => {
      console.debug(
        `Shared playlistStructurePublisher[playlist=${playlistId}] has finalized, removing it from cache`
      )
      playlistStructureMap.delete(playlistId)
    }),
    share({
      // keep subscribing for 1 minute after last subscriber unsubscribes
      resetOnRefCountZero: () => timer(60_000)
    })
  )
}
 
function makePlaylistStructureDiffPublisher(playlistId: number): Observable<PlaylistStructureDiff> {
  // reuse the playlist shared observable
  return getPlaylistStructurePublisher(playlistId).pipe(
    startWith([]), // pairwise needs 2 values to compare, initialize observable with a default value
    pairwise(), // groups emission into a tuple [previousValue, currentValue]
    map(([previousPlaylist, currentPlaylist]) => makeDiff(previousPlaylist, currentPlaylist)),
    filter(isNotEmpty),
    // remove publisher from cache after it's been unsubscribed
    finalize(() => {
      console.debug(
        `Shared playlistStructureDiffPublisher[playlist=${playlistId}] has finalized, removing it from cache`
      )
      playlistStructureDiffMap.delete(playlistId)
    }),
    share({
      // keep subscribing for 1 minute after last subscriber unsubscribes
      resetOnRefCountZero: () => timer(60_000)
    })
  )
}

Writing the Subscription’s Resolver

Now that we have all the subscription’s required logic encapsulated and data processing shared, let’s use it from the subscription’s resolver:

import { createSchema } from 'graphql-yoga'
import { eachValueFrom } from 'rxjs-for-await'
 
export const schema = createSchema({
  typeDefs: /* GraphQL */ `
    type Subscription {
      playlistStructureUpdates(playlistId: Int): PlaylistStructure!
      playlistStructureDiffs(playlistId: Int): PlaylistStructureDiff!
    }
  `,
  resolvers: {
    Subscription: {
      playlistStructureUpdates: {
        subscribe(root, args, context) {
          return eachValueFrom(getPlaylistStructurePublisher(args.playlistId))
        },
        resolve(root, args, context) {
          return root
        }
      },
      playlistStructureDiffs: {
        subscribe(root, args, context) {
          return eachValueFrom(getPlaylistStructureDiffPublisher(args.playlistId))
        },
        resolve(root, args, context) {
          return root
        }
      }
    }
  }
})

Conclusion

In this article, we have seen how to use RxJS (or the reactive programming library of your choice) in order to mutualize the generation of expensive-to-produce values for a GraphQL subscription but also to trigger this processing on-demand whenever a consumer is interested in subscribing to a specific entity updates and stop it (with or without delay) when there are no more subscribers. Note that this technique is not exclusive to GraphQL subscriptions and can be used for any kind of subscription.

Join our newsletter

Want to hear from us when there's something new? Sign up and stay up to date!

By subscribing, you agree with Beehiiv’s Terms of Service and Privacy Policy.

Recent issues of our newsletter

Similar articles