The Guild LogoThe Guild Monogram

Search docs

Search icon

Products by The Guild

Products

Hive logoHive blurred logo

Hive

Schema Registry for your GraphQL Workflows

Subscriptions and Live Queries - Real Time with GraphQL

Subscriptions and Live Queries - Real Time with GraphQL - The Guild Blog
Looking for experts? We offer consulting and trainings.
Explore our services and get in touch.

Subscriptions are the go-to solution for adding real-time capabilities to a GraphQL-powered application. At the same time the term GraphQL Live Query floats around and can often be found in the context of subscriptions.

While GraphQL Subscriptions have been a part of the GraphQL Specification for some time, GraphQL Live Queries are not part of the specification and further there is no RFC going on.

However, discussion about GraphQL Live Queries started way back when GraphQL Subscriptions were designed.

So let's take a recap of GraphQL Subscriptions, take a look at existing Live Query Solutions today and compare the differences between the two solutions for real-time.

Subscription Recap

The GraphQL Subscription RFC was merged back in March 2017. The first major and wide-adopted transport implementation was (and probably is) subscriptions-transport-ws. It was developed by Apollo, but unfortunately they seem to have abandoned it since then. Fortunately, we now have a successor graphql-ws.

A subscription operation looks similar to this.

subscription onPostAddedSubscription {
  onPostAdded {
    post {
      id
      author {
        id
        login
      }
    }
  }
}

In contrast to a GraphQL query operation, a subscription operation is only allowed to select a single field on the GraphQL Subscription root type.

Furthermore, executing a subscription operation represents a stream of results where executing a query operation only results in a single result.

Let's take a quick look at the graphql-js reference implementation!

Since promises cannot represent multiple values over time, the graphql-js reference implementations uses AsyncIterator, which is a native structure similar to Observables (which one might already know by having dug a bit deeper into the most-widely adopted GraphQL clients).

Each subscription root field must provide a subscribe function that returns an AsyncIterator and optionally has a resolve function for mapping the published events.

When a subscription operation is executed, the subscribe function of that field resolver is called and the AsyncIterator returned by it will be used as the source for the stream of events returned to the client.

Once the AsyncIterator publishes a value (the payload or event), the optional resolve function on the selected subscription root field is called with the value. All subsequently executed resolvers in the resolver/type tree behave like normal query resolvers.

The most basic implementation for a counter would look similar to this:

const sleep = (t = 1000) => new Promise((res) => setTimeout(res, t));

const resolvers = {
  Subscription: {
    countToNumber: {
      subscribe: async function* (_, args) {
        for (let counter = 1; counter <= args.toNumber; counter++) {
          yield { countToNumber: counter };
          await sleep();
        }
      },
    },
  },
};

The above subscription will count up to the number provided via the toNumber argument (while having a delay of one second between each message) and then complete.

Of course, in a real-world application we would like to subscribe to other event sources instead of some static, pre-defined events.

The most common used (but not best maintained) library for such a PubSub engine in the GraphQL context is graphql-subscriptions. There are also adapters available for more distributed systems (where all GraphQL API replicas must be notified about the event) e.g. over Redis.

If there is not need for scaling horizontally the graphql-subscriptions package can be omitted and be replaced with the Node.js native events module:

import { EventEmitter, on } from 'events';

export const createPubSub = <TTopicPayload extends { [key: string]: unknown }>(
  eventEmitter: EventEmitter
) => {
  return {
    publish: <TTopic extends Extract<keyof TTopicPayload, string>>(
      topic: TTopic,
      payload: TTopicPayload[TTopic]
    ) => void emitter.emit(topic as string, payload),
    subscribe: async function* <
      TTopic extends Extract<keyof TTopicPayload, string>
    >(topic: TTopic): AsyncIterableIterator<TTopicPayload[TTopic]> {
      const asyncIterator = on(emitter, topic);
      for await (const [value] of asyncIterator) {
        yield value;
      }
    },
  };
};

const pubSub = createPubSub(new EventEmitter());

A (type-safe) PubSub implementation in 21 lines of code. We will use this for the example below.

In my opinion the different pub sub implementations should rather be based on EventEmitter instead of graphql-subscriptions. A PubSub can but musn't be used together with GraphQL. By choosing the name graphql-subscriptions it gives the impression that the logic is specific to GraphQL and reduces other possible contributions from people that need a similar event abstraction.

Therefore, I hope the next generation/iteration of Node.js PubSub implementations is less specific.

Having said that, let's take a look at a more "real-world" like example of using subscriptions with PubSub:

const resolvers = {
  Subscription: {
    onPostAdded: {
      subscribe: async function* (_, _, context) {
        for await (const { id } of context.pubSub.subscribe('POST_ADDED')) {
          const post = await context.postStore.load(id, context.viewer);
          if (post === null) {
            continue;
          }
          yield { onPostAdded: loadedPost };
        }
      },
    },
  },
  Mutation: {
    postAdd: (_, args, context) => {
      const post = context.postStore.create(args);
      // wo don't wanna publish the whole object via our event emitter, the id should be sufficient
      context.pubSub.publish('POST_ADDED', { id: args.id });
      return post;
    },
  },
};

Until now we only took a look at the resolvers. Let's also quickly check the subscribe function exported from graphql-js, which can be used for executing subscriptions.

import { subscribe } from 'graphql';

const subscribeResult = await subscribe({
  schema,
  document,
});

if (isAsyncIterable(subscribeResult)) {
  for await (const executionResult of subscribeResult) {
    sendToClient(executionResult);
  }
} else {
  sendToClient(subscribeResult);
}

The subscribe function returns either a AsyncIterable that publishes multiple ExecutionResults or a single ExecutionResult in case the setup of the subscription somehow fails.

The interesting thing is that we can use any transport for delivering the results to client. The most popular implementation (as mentioned before) is subscriptions-transport-ws. Unfortunately, since it is poorly maintained, the GraphQL Working Group came up with a new implementation over WebSockets, graphql-ws.

But we are not forced to use WebSockets at all. Server Side Events) might be a more lightweight solution for both our server and client.

It is actually a shame that the default express-graphql reference HTTP transport implementation does not come with a built-in subscription solution.

Fortunately, we now have libraries like Graphql Helix, which, in my humble opinion, should replace express-graphql as the reference HTTP implementation since GraphQL Helix is also not tied to any web server framework.

For public GraphQL APIs, I am convinced that Server Sent Events is the future as there is less work required for implementing the protocol.

I also built my own transport over Socket.io, which uses WebSockets by default and HTTP polling as a fallback.

As you can see, with GraphQL subscriptions, we are free to choose the best transport for our application requirements!


Now that we took a look at how GraphQL Subscription resolvers are implemented on the server-side, lets also check out how we can consume the GraphQL API on the client-side!

Usually we will have a network interface that is called by our favorite GraphQL client. Every single client has a different name and implementation. Apollo calls them links, Relay calls them fetcher functions, and urql calls them exchanges.

All of them have one thing in common. They are working with observable-like data structures, which basically means for consuming subscriptions, all major GraphQL client-libraries decided to use Observables, instead of AsyncIterators (which are still not part of the ECMA Spec as of October 2020).

Like an AsyncIterator, an Observable can represent multiple values. As already mentioned, each client library and transport have slightly different interfaces. I will use graphql-ws with relay-runtime as an example.

The example is taken straight from the graphql-ws section.

import {
  Network,
  Observable,
  RequestParameters,
  Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-ws';

const subscriptionsClient = createClient({
  url: 'wss://i.love/graphql',
  connectionParams: () => {
    const session = getSession();
    if (!session) {
      return {};
    }
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});

// yes, both fetch AND subscribe handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
  return Observable.create((sink) => {
    if (!operation.text) {
      return sink.error(new Error('Operation text cannot be empty'));
    }
    return subscriptionsClient.subscribe(
      {
        operationName: operation.name,
        query: operation.text,
        variables,
      },
      {
        ...sink,
        error: (err) => {
          if (err instanceof Error) {
            sink.error(err);
          } else if (err instanceof CloseEvent) {
            sink.error(
              new Error(
                `Socket closed with event ${err.code}` + err.reason
                  ? `: ${err.reason}` // reason will be available on clean closes
                  : ''
              )
            );
          } else {
            // GraphQLError[]
            sink.error(new Error(err.map(({ message }) => message).join(', ')));
          }
        },
      }
    );
  });
}

const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
const store = new Store(new RecordSource());
export const environment = new Environment({ network, store });

With this configuration, Relay can now execute subscription operations. Because the graphql-ws protocol is way more complex than the GraphQL over HTTP protocol, we use the client exported from the graphql-ws package instead. This results in some additional bundle-size. As mentioned before SSE might be a better, lightweight alternative.

That aside, let's start with a basic subscription that should update one of our components.

Our PostRender already shows some content.

const PostQuery = graphql`
  query PostQuery($postId: ID!) {
    post(id: $postId) {
      id
      title
      content {
        text
      }
      totalLikeCount
    }
  }
`;

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId });

  return <Post {...props} />;
};

As a new feature requirement, the like count of the post should get updated once someone hits the like button.

We could choose different ways of implementing such a subscription.

  1. General Subscription for changed post
subscription PostChanged($postId: ID!) {
  onPostChanged(postId: $postId) {
    post {
      id
      totalLikeCount
    }
  }
}
  1. Specific Subscription
subscription PostLikeCountChanged($postId: ID!) {
  onPostLikeCountChanged(postId: $postId) {
    totalLikeCount
  }
}

Both solutions have different implications.

  1. General Subscription for changed post

This approach is not limited to notifying whether the totalLikeCount of the post likes have changed; in the future we could adjust the selection set on the post field as it also returns a Post type similar to our already existing Query.post field. It will automatically update the post record already in the cache as the Relay store (similar to other clients) can identify the post object via the id field. The drawback is that we could potentially send too much data over the wire. E.g. if we also wanted to subscribe to title changes all additional selected fields are sent to the client each time the underlying event is emitted, even if only the totalLikeCount value has changed.

const PostChangedSubscription = graphql`
  subscription PostChanged($postId: ID!) {
    onPostChanged(postId: $postId) {
      post {
        id
        totalLikeCount
      }
    }
  }
`;

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId });
  // thats all we gotta do
  useSubscription(PostChangedSubscription, /* variables */ { postId });

  return <Post {...props} />;
};
  1. Specific Subscription

This subscription is specifically designed for only an update of the totalCount. However, the subscription result returns no Post type. Therefore we cannot make use of the automatic cache updates via the id. We have to additionally define a handler for updating the post in the cache.

const PostLikeCountChangedSubscription = graphql`
  subscription PostLikeCountChanged($postId: ID!) {
    onPostLikeCountChanged(postId: $postId) {
      totalCount
    }
  }
`;

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId });
  useSubscription(
    PostLikeCountChangedSubscription,
    /* variables */ { postId },
    {
      // we need to manually update the cache :(
      updater: (store, payload) => {
        const record = store.get(postId);
        if (record) {
          record.setValue(
            'totalLikeCount',
            payload.onPostLikeCountChanged.totalCount
          );
        }
      },
    }
  );

  return <Post {...props} />;
};

Obviously, for this example, no sane person would actually want to choose the second solution over the first one.

But as our business requirements might get more complex we might need to do manual cache updates.

A very good example for this is lists. Imagine us having a set of data in which a single item changes. The "easy to implement" solution would be to just refetch the complete list every time a single item is added/removed/changed. However, For a list containing hundreds of items only sending the changed item to the client might be the smarter and faster solution...

This can be implemented via a union type.

type OnUserAdded {
  user: User!
}
type OnUserRemoved {
  removedUserId: ID!
}

union OnUserListChange = OnUserAdded | OnUserRemoved

type Subscription {
  onUserListChange: OnUserListChange!
}

The corresponding code, including handling the cache updates:

const UserListQuery = graphql`
  query UserListQuery {
    users {
      id
      name
    }
  }
`;

const UserListChangeSubscription = graphql`
  subscription UserListChangeSubscription {
    onUserListChange {
      __typename
      ... on OnUserAdded {
        user {
          id
          name
        }
      }
      ... on OnUserRemoved {
        removedUserId
      }
    }
  }
`;

const UserListRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(UserListQuery);
  useSubscription(
    UserListChangeSubscription,
    /* variables */ { postId },
    {
      // we need to manually update the cache :(
      updater: (store, payload) => {
        const record = store.getRootField();
        const users = rootField.getLinkedRecords('users');
        if (users) {
          switch (payload.onUserListChange.__typename) {
            case 'OnUserAdded': {
              const newUser = store.create(
                payload.onUserListChange.user.id,
                'User'
              );
              newUser.setValue('name', payload.onUserListChange.user.name);
              rootField.setLinkedRecords('users', [...users, newUser]);
              break;
            }
            case 'onUserRemoved': {
              rootField.setLinkedRecords(
                users.map(
                  (user) =>
                    user.getDataID() !== payload.onUserListChange.removedUserId
                )
              );
              break;
            }
          }
        }
      },
    }
  );

  return <UserList {...props} />;
};

As our application grows the manual cache update code can become so complex and confusing that I have considered switching back into simply refetching the queries in some applications.

Fortunately, Relay contributers have worked on some nice query directives that allow reducing such cache update code. It won't cover all cases though.

In all of the above examples we responded (more or less implicitly) to data change events.

Subscriptions can be used to apply data changes on the client. But they are probably not the best tool for that.

Before taking a look of what could be a better tool, let's look at another usage example for subscriptions.

const OnPlaySound = graphql`
  subscription OnPlayChatSound {
    onPlayChatSound {
      soundUrl
    }
  }
`;

const ChatRenderer = () => {
  const chat = useQuery(ChatQuery);
  useSubscription(
    OnPlaySound,
    useMemo({
      onNext: (payload) => {
        playSound(payload.onPlayChatSound.soundUrl);
      },
    })
  );

  return <Chat chat={chat} />;
};

The difference here is that we are not manipulating our existing data but rather executing a side effect.

Subscriptions can also be used for side-effects that should not alter or touch any data in the cache.

Live Queries

What is a live query? Well, there is no specification for that so the term is ambiguous. Today, there are several solutions one could describe as live queries.

All those solutions have one thing in common: Trying to keep the client state in sync with the server.

Which can be paraphrased as observing data.

Before we take a look at how all of those implementations, let's break down what we should or can expect from a live query implementation.

Automatically update the clients

This is one is pretty obvious. The live query implementation should keep the client state as close to the server state as possible.

Efficiently update clients

If only a part of our data has changed we don't necessarily need to send the complete execution result over the wire. Only sending the changed parts might be more efficient. In general the network overhead should be minimal.

Flexibility

Ideally a solution should not be tied to a specific database or coupled with some SaaS service that won't be around next year.

Adoptability

In case we already have some kind of GraphQL schema or server implementation, the live query solution should be adobtable without changing everything and starting new.

Polling a Query

The easiest solution for implementing live queries would be to poll a query in intervals. Most GraphQL clients have such a feature already implemented.

const PostQuery = graphql`
  query PostQuery($postId: ID!) {
    post(id: $postId) {
      id
      title
      content {
        text
      }
      likes {
        totalCount
      }
    }
  }
`;

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(
    PostQuery,
    /* variables */ { postId },
    {
      pollIntervall: 5000,
    }
  );

  return <Post {...props} />;
};
  1. Automatically updates clients. No.

Depending upon the use-case, this could be a valid solution, but for true real-time applications that require instant feedback, this is not precise enough due to the delay caused by the poll interval.

  1. Efficiently updates clients. No.

The whole execution result is sent over to the client for every single time the operation is re-executed. A lot more data than necessary is transported over the wire to the client even if nothing has changed from the last poll interval.

  1. Flexibility. High.

Straight forward, as this does not rely on any changes on the server and only slight changes on our frontend.

  1. Adoptability. High.

Straight forward, again almost no changes required.

Live Queries over Subscriptions

We already had a "live query over subscription"-like example above.

const PostChangedSubscription = graphql`
  subscription PostChanged($postId: ID!) {
    onPostChanged(postId: $postId) {
      post {
        id
        title
        likes {
          totalCount
        }
      }
    }
  }
`;

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useQuery(PostQuery, /* variables */ { postId });
  // thats all we gotta do
  useSubscription(PostChangedSubscription, /* variables */ { postId });

  return <Post {...props} />;
};

Let's ditch the PostQuery completely and instead use a PostSubscription that always emits an initial event.

const PostSubscription = graphql`
  subscription PostSubscription($postId: ID!) {
    post(id: $postId) {
      id
      title
      likes {
        totalCount
      }
    }
  }
`;

const PostRenderer = ({ postId }: { postId: string }) => {
  const { props } = useSubscription(
    PostChangedSubscription,
    /* variables */ { postId }
  );

  return <Post {...props} />;
};

A server resolver implementation could look similar to this:

const resolvers = {
  Subscription: {
    post: async function* (_, { id }, context) {
      let loadPost = () => context.postStore.load(id, context.viewer);
      // publish first post
      yield { post: await loadPost() };
      // publish post again once change event is emitted
      for await (const _ of context.pubSub.subscribe(`Post:${id}`)) {
        yield { post: await loadPost() };
      }
    },
  },
};

We replace two operations with a single one!

A similar approach is used by Hasura and also PostGraphile.

The obvious drawback of both platforms is the lock-in into using a specific database. Of course that might not be a problem for most people, but having a general solution that works with any data source would be nice as more complex schema could fetch from different database types or other third party APIs.

Those implementations keep track of all the resources in the execution result and re-execute the subscription operation once any of those resources changes.

The resolver implementation above only responds to emitted post changes. In order keep track of all the resources defined in an operation's selection set, we will have to come up with a smart abstraction.

Another drawback of subscriptions for live queries is the limitation of only selecting one root subscription field, which is defined by the GraphQL subscription specification. Furthermore, we must also redeclare our query type fields to the subscription type.

There is a workaround we can apply for re-exporting the whole Query type via a sub-field in the subscription type.

type Query {
  user(id: ID!): User
  post(id: ID!): Post
}

type Subscription {
  live: Query!
}

This approach would allow us to query everything on the query object type via the live field on the subscription object type, without having the limit of only being able to query one resource or having to redeclare every resource field resolver on the subscription type. Neat!

const PostSubscription = graphql`
  subscription PostSubscription($postId: ID!, $userId: ID!) {
    live {
      post(id: $postId) {
        id
        title
        totalLikeCount
      }
      user(id: $userId) {
        id
        name
      }
    }
  }
`;

Okay, now we can select everything we could have selected with our query operation!

  1. Automatically updates clients. Yes.

When using services like PostGraphile and Hasura that is the case. However, for any additional resolvers that are added on top of the service schema, we cannot implement an invalidation mechanism. In user-land we will have to come up with an implementation of resource tracking by ourselves.

  1. Efficiently updates clients. No.

The whole execution result is sent over to the client for every single time a live query is invalidated.

  1. Flexibility. Low.

Both Hasura and PostGraphile are tightly coupled to a SQL database. For any custom resolvers, we have to come up with the mechanism for resource tracking and invalidation ourselves.

  1. Adoptability. Low.

Switching to a server powered by PostGraphile or Hasura with an already existing GraphQL schema is no easy task.

GraphQL Live Queries over Subscriptions with JSON patch

Ideally, we only want to send a patch to the client that provides instructions on how to get from the previous execution result to the next. The lack of these instructions has been a big flaw in the previous two implementations.

The RFCs and implementations for the @defer and @stream introduced ways of sending (delayed) partial results to clients. However, those "patch" operations are currently highly limited to a "replace at path" and "append to list" operation.

A format such as JSON Patch might be a better alternative for live queries.

graphql-live-subscriptions tries to solve that with Subscription.live field that exposes both a Query and a JSON patch field.

Schema Types for graphql-live-subscriptions

scalar JSON

type RFC6902Operation {
  op: String!
  path: String!
  from: String
  value: JSON
}

type LiveQuery {
  patch: [RFC6902Operation]
  query: Query
}

type Subscription {
  live: LiveQuery!
}

A live query operation can be declared similar to our PostSubscription document above.

const PostSubscription = graphql`
  subscription PostSubscription($postId: ID!, $userId: ID!) {
    live {
      query {
        post(id: $postId) {
          id
          title
          totalLikeCount
        }
        user(id: $userId) {
          id
          name
        }
      }
      patch {
        op
        path
        from
        value
      }
    }
  }
`;

The difference is that the type returned by the live field has two fields instead of a single one. The query field, which selects the selection set from Query type and a patch field which is a JSON Patch operation. When executing the given operation against the server the initial result will have the data selected by the query field selection set included. All following values will have no query value (null), but instead an array of patch operations that describe the changes for updating the last result to the next result.

Initial result

{
  "data": {
    "live": {
      "query": {
        "post": {
          "id": "1",
          "title": "foo",
          "totalLikeCount": 10
        },
        "user": {
          "id": "10",
          "name": "Laurin"
        }
      },
      "patch": null
    }
  }
}

Patch result (increase totalLikeCount)

{
  "data": {
    "live": {
      "query": null
    },
    "patch": [
      {
        "op": "replace",
        "path": "post/totalLikeCount",
        "from": null,
        "value": 11
      }
    ]
  }
}

The clients must then implement the logic for applying the patch operation to their client cache or for applying the patches on the initial result in the network/fetcher/link layer.

The server implementation uses an event emitter and an immutable state tree for detecting changes that must be sent to clients. The patch is automatically generated from the next immutable state that is compared against the last which got emitted via an EventEmitter.

While the idea is quite nice, the implementation is obviously meant for backends that already use reactive or immutable data structures. Having to rewrite our existing GraphQL layer to support live queries is a big trade-off. Furthermore, the library is not maintained that well. I've made PRs to make the library compatible with recent GraphQL versions, but these have yet to be merged. Using unions and interfaces is not possible. Having to patch a library with patch-package before even being usable is generally a bad sign.

  1. Automatically updates clients. Yes.

When implementing our schema conform to the library, this library delivers precise results once the immutable state has changed.

  1. Efficiently updates clients. Yes.

Initially a result tree is sent to the client. Afterwards, only JSON patches that must be applied to the initial result are sent to the client.

  1. Flexibility. Kind of. ✳️

We don't rely on any third party services, however, we forced into immutability to some extend.

  1. Adoptability. It depends. ✳️

Adding an immutable layer to our existing schema might be a pretty big change. Furthermore, the library does lack support for some GraphQL features such as Interfaces and Unions.

GraphQL Live Queries via the @live directive

There are companies out there, like Facebook, that are already using this approach. There is also a GraphQL framework available in Go that supports live queries out of the box! Check out thunder here.

The idea behind the @live directive is that it is used to mark that the client is interested in keeping that query execution result as up to date as possible. The implementation, however, is up to user-land.

query ($id: ID!) @live {
  post(id: $id) {
    id
    title
    totalLikeCount
  }
}

The idea of just making any query without additional overhead a live query is very appealing from the view of a frontend developer. From a backend perspective, however that raises new questions. Just adding a directive on the operation on frontend won't make the whole backend reactive.

After having built an example app with graphql-live-subscriptions from scratch, studying the flaws of that library and being uncomfortable with the vendor lock-in of services such as PostGraphile and Hasura, I decided to approach the problem of live queries in a more pluggable way, by using the @live directive.

@n1ru4l/graphql-live-query A common definition and set of utilities for determining of a live query

This module provides two things.

  1. GraphQLLiveDirective that can be added to any schema.
import { GraphQLSchema, specifiedDirectives } from 'graphql';
import { GraphQLLiveDirective } from '@n1ru4l/graphql-live-query';
import { query, mutation, subscription } from './schema';

const schema = new GraphQLSchema({
  query,
  mutation,
  subscription,
  directives: [
    GraphQLLiveDirective,
    /* Keep @defer/@stream/@if/@skip */ ...specifiedDirectives,
  ],
});
  1. isLiveQueryOperationDefinitionNode

This is a simple function that takes a OperationDefinitionNode and returns true if it is a live query document.

These utility functions can be found here on Github

Those functions alone might not seem that helpful alone, but they are a contract live query execution engines could built on. Such as the package we are talking about next 😇.

@n1ru4l/in-memory-live-query-store Keep track and invalidate resources selected by a live query in an efficient, but generic way

The InMemoryLiveQueryStore.execute function is a drop in replacement for the execute function provided by the graphql package.

When encountering a query operation that is marked with the @live directive it will return a AsyncIterator instead of a Promise that can be used for sending multiple results to the client. Similar to how subscribe (or defer/stream) works.

Internally, the store keeps track of the resources selected in the live query operation selection set. That means all root query field coordinates (e.g. Query.post) and global resource identifiers (e.g. Post:1). The store can then be notified to re-execute live query operations that select a given root query field or resource identifier via the InMemoryLiveQueryStore.invalidate method with the corresponding resource identifier or field coordinates. A resource identifier is composed out of the type name and the actual resolved id value separated by a colon, but this behavior can be customized. For ensuring that the store keeps track of all our query resources we should always select the id field on our object types. The store will only keep track of fields with the name id and the type ID! (GraphQLNonNull(GraphQLID)).

import { InMemoryLiveQueryStore } from '@n1ru4l/in-memory-live-query-store';
import { parse } from 'graphql';
import { schema } from './schema';

const inMemoryLiveQueryStore = new InMemoryLiveQueryStore();

const rootValue = {
  todos: [
    {
      id: '1',
      content: 'Foo',
      isComplete: false,
    },
  ],
};

inMemoryLiveQueryStore
  .execute({
    schema,
    operationDocument: parse(/* GraphQL */ `
      query todosQuery @live {
        todos {
          id
          content
          isComplete
        }
      }
    `),
    rootValue: rootValue,
    contextValue: {},
    variableValues: null,
    operationName: 'todosQuery',
  })
  .then(async (result) => {
    if (isAsyncIterable(result)) {
      for (const value of result) {
        // Send to client in real-world app :)
        console.log(value);
      }
    }
  });

// Invalidate by resource identifier
rootValue.todos[0].isComplete = true;
inMemoryLiveQueryStore.invalidate(`Todo:1`);

// Invalidate by root query field coordinate
rootValue.todos.push({ id: '2', content: 'Baz', isComplete: false });
inMemoryLiveQueryStore.invalidate(`Query.todos`);

When using an ORM such as Prisma, we can simply add a middleware for automatically invalidating resources.

Use Prisma middleware for resource invalidation

// Register Middleware for automatic model invalidation
prisma.$use(async (params, next) => {
  const resultPromise = next(params);

  if (params.action === 'update') {
    resultPromise.then((res) => {
      if (res?.id) {
        // invalidate `Post:1` on update
        liveQueryStore.invalidate(`${params.model}:${res.id}`);
      }
    });
  }

  return resultPromise;
});

In case we have multiple server replicas some PubSub implementation can be used for distributing the events.

PubSub with Redis

import Redis from 'ioredis';

const redis = new Redis();
redis.subscribe('invalidations');
redis.on('message', (channel, message) => {
  if (channel === 'invalidations') {
    // message is "Post:1"
    liveQueryStore.invalidate(message);
  }
});

The transports graphql-ws (GraphQL over WebSocket), graphql-helix (GraphQL over SEE) and @n1ru4l/socket-io-graphql-server (GraphQL over Socket.io), support providing a custom execute function that is allowed to return AsyncIterables (thanks to the recent changes required for @defer and @stream). All we have to do is to pass the InMemoryLiveQueryStore.execute to our server factory!

Example with graphql-ws

import http from 'http';
import ws from 'ws'; // yarn add ws
import { useServer } from 'graphql-ws/lib/use/ws';
import { subscribe } from 'graphql';
import { InMemoryLiveQueryStore } from '@n1ru4l/in-memory-live-query-store';
import { parse } from 'graphql';
import { schema } from './schema';

const inMemoryLiveQueryStore = new InMemoryLiveQueryStore();

const server = http.createServer(function weServeSocketsOnly(_, res) {
  res.writeHead(404);
  res.end();
});

const wsServer = new ws.Server({
  server,
  path: '/graphql',
});

useServer(
  {
    schema,
    execute: inMemoryLiveQueryStore.execute,
    subscribe,
  },
  wsServer
);

server.listen(443);

The best thing is you can start playing around with it today! The fully functional implementation is available as @n1ru4l/in-memory-live-query-store on Github. Feel free to create any issues regarding missing features or documentation. Let's shape the future for GraphQL live queries together!

@n1ru4l/graphql-live-query-patch: Optional JSON patch middleware for smaller payloads over the wire.

GraphQL execution result payloads can become quite huge. Sending those over the wire can be expensive at some point. Especially when they are sent often for fast updating state. JSON patch is a handy standard for only sending change instructions over the wire which can potentially reduce such huge payloads.

Instead of having JSON patches enabled by default, it is a totally optional module, that can be applied on the client and the server for deflating (create patches on the server) and inflating (apply patches on the client) the execution results. Smaller projects might even be better off not using JSON patch at all, as the patch payload might be bigger than the whole query result.

The patches are created by comparing the latest execution result with the previous execution result. That means the server will always have to store the latest execution result as long as the live query is active.

Here are some example execution results after applying the patch generator

Initial result

{
  "data": {
    "post": {
      "id": "1",
      "title": "foo",
      "totalLikeCount": 10
    },
    "user": {
      "id": "10",
      "name": "Laurin"
    }
  },
  "revision": 1
}

Patch result (increase totalLikeCount)

{
  "patch": [
    {
      "op": "replace",
      "path": "post/totalLikeCount",
      "value": 11
    }
  ],
  "revision": 2
}

On the server adding the patch generation middleware is easy function composition:

import { flow } from 'fp-ts/function';
import { createApplyLiveQueryPatchGenerator } from '@n1ru4l/graphql-live-query-patch';

const applyLiveQueryPatchGenerator = createApplyLiveQueryPatchGenerator();

const execute = flow(liveQueryStore.execute, applyLiveQueryPatchGenerator);
// same as
const execute0 = (...args) =>
  applyLiveQueryPatchGenerator(liveQueryStore.execute(...args));

On the client we now need to build a execution result out of the initial result and the patch results, because our clients do not understand the graphql live query json patch protocol!

Applying the middleware is pretty easy as well!

import { flow } from 'fp-ts/function';
import { createApplyLiveQueryPatch } from '@n1ru4l/graphql-live-query-patch';

const applyLiveQueryPatch = createApplyLiveQueryPatch();

const execute = flow(networkInterface.execute, applyLiveQueryPatch);
// same as
const execute0 = (...args) =>
  applyLiveQueryPatch(networkInterface.execute(...args));

The library is optimized for network interfaces that return AsyncIterables. We can easily wrap out favorite network interface (that uses observable style sinks) in an AsyncIterable with @n1ru4l/push-pull-async-iterable-iterator!

Example with graphql-ws

import { pipe } from 'fp-ts/function';
import { makeAsyncIterableIteratorFromSink } from '@n1ru4l/push-pull-async-iterable-iterator';
import { createApplyLiveQueryPatch } from '@n1ru4l/graphql-live-query-patch';
import { createClient } from 'graphql-ws/lib/use/ws';

const client = createClient({
  url: 'ws://localhost:3000/graphql',
});

const applyLiveQueryPatch = createApplyLiveQueryPatch();

const asyncIterator = pipe(
  makeAsyncIterableIteratorFromSink((sink) => {
    const dispose = client.subscribe(
      {
        query: 'query @live { hello }',
      },
      {
        next: sink.next,
        error: sink.error,
        complete: sink.complete,
      }
    );
    return () => dispose();
  }),
  applyLiveQueryPatch
);

// stop live query after 5 seconds.
setTimeout(() => {
  asyncIterator.return();
}, 5000);

for await (const executionResult of asyncIterator) {
  console.log(executionResult);
}

The whole package can be found @n1ru4l/graphql-live-query-patch on GitHub. It is also usable and feedback is highly appreciated. It currently has it's flaws with list diffing, so further thoughts and ideas are highly appreciated.

Note: We now seperated the json-patch package into a seperate @n1ru4l/graphql-live-query-patch-json-patch package. @n1ru4l/graphql-live-query-patch now only includes common logic shared between patch payload implementations. The reason for this is that there are more efficient patch formats available, such as jsondiffpatch, which generate smaller patches for list changes. The latter is now available as the separate package @n1ru4l/graphql-live-query-patch-jsondiffpatch.

So let's take a look at this modular implementation approach regarding the aspects we used before.

  1. Automatically update the clients. Yes.

The approach of pushing the invalidation responsibility to the server might at first seem like a drawback, but a smart abstraction (such as a ORM middleware), can result in pretty responsive applications.

  1. Efficiently updates clients. Yes.

In case our execution result payloads are getting too big we can easily enable JSON patches by adding a middleware. Furthermore, the middleware is totally live query implementation independent! That means if our projects requires a different implementation of the live query engine, the middleware can still be applied as long as the execution result is compatible with the live query execution result format.

  1. Flexibility. High.

Any database or third-party API can be used as long as we can somehow invalidate the resource (via some PubSub system etc.). As all incorporated libraries are pretty pluggable only a few can be used and others be replaced by something that might make more sense for a specific project.

  1. Adoptability. High.

This library can be added to any existing GraphQL.js schema without any hassle if you use a transport that allows full control over the schema creation and execution phase, such as graphql-helix or graphql-ws which you are hopefully already using! Resource invalidation code can be added to the mutation resolvers over time gradually to reflect the needs of the frontend. The possibilities of resource invalidation are endless and the logic for those can be added incrementally. E.g. via a ORM middleware, in our mutation code or maybe even on our GraphQL gateway.

What's next?

Summed up this could be the start of a pluggable live query ecosystem that is flexible enough to be compatible with a wide range of databases, graphql transports and schema builders instead of focusing to much on a niche. In general, instead of having less flexible and bulky servers that try to be too opinionated and restrictive in their schema -> execution -> transport flow, GraphQL tooling should become more modular. At the same time this, however, should not imply that there is no need for opinionated framework-like approaches (as long as they give enough flexibility, e.g. by being composed out of modular packages).

I hope that more people will start exploring the possibilities of GraphQL live queries and also GraphQL in general! Eventually, we could even come up with an official live RFC for the spec!

There is a lot more to think about such as

  • Partial query re-execution (e.g. by building ad-hoc queries that only select affected resources)
  • Mechanisms for batching similar query operations (e.g. for preventing execution of same operation multiple times after invalidation)
  • Better list diffing/format for live query patches (as JSON patch performs rather poorly on list diffs) (Solved by introducing @n1ru4l/graphql-live-query-patch-jsondiffpatch)
  • What ever is bothering you while reading this article! 😄

If you are a enthusiastic tinkerer and plan to build something with the libraries above, share it with me and everyone else, so we can get more insights into different use-cases and requirements for all kind of applications!

Let's continue the discussion on Discord, Slack or on GitHub

Here are some more resource for you to thinker around with:

GraphQL Live Query libraries and example todo app that sync across all connected clients.

Experimental GraphQL Playground that implements RFC features (defer/stream) and live queries.

More information on how the InMemoryLiveQueryStore keeps track of resources.

Further thoughts on how the Relay Spec could help backing a live query system.