Recipes

Recipes

Short and concise code snippets for starting with common use-cases.

Client Usage

With EventSource (distinct connections mode)

The following example doesn’t use the graphql-sse client and works only for the “distinct connections mode” of the GraphQL over SSE spec. This mode is the client default and should suffice for most of the use-cases.

const url = new URL('http://localhost:4000/graphql/stream');
url.searchParams.append('query', 'subscription { greetings }');
 
const source = new EventSource(url);
 
source.addEventListener('next', ({ data }) => {
  console.log(data);
  // {"data":{"greetings":"Hi"}}
  // {"data":{"greetings":"Bonjour"}}
  // {"data":{"greetings":"Hola"}}
  // {"data":{"greetings":"Ciao"}}
  // {"data":{"greetings":"Zdravo"}}
});
 
source.addEventListener('error', (e) => {
  console.error(e);
});
 
source.addEventListener('complete', () => {
  source.close();
});

With Promise

import { createClient, RequestParams } from 'graphql-sse';
 
const client = createClient({
  url: 'http://hey.there:4000/graphql/stream',
});
 
(async () => {
  const query = client.iterate({
    query: '{ hello }',
  });
 
  try {
    const { value } = await query.next();
    // next = value = { data: { hello: 'Hello World!' } }
    // complete
  } catch (err) {
    // error
  }
})();

With AsyncIterator

import { createClient, RequestParams } from 'graphql-sse';
 
const client = createClient({
  url: 'http://iterators.ftw:4000/graphql/stream',
});
 
(async () => {
  const subscription = client.iterate({
    query: 'subscription { greetings }',
  });
  // "subscription.return()" to dispose
 
  for await (const result of subscription) {
    // next = result = { data: { greetings: 5x } }
    // "break" to dispose
  }
  // complete
})();

With Observable

import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable
 
const client = createClient({
  url: 'http://graphql.loves:4000/observables',
});
 
export function toObservable(operation) {
  return new Observable((observer) =>
    client.subscribe(operation, {
      next: (data) => observer.next(data),
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    }),
  );
}
 
const observable = toObservable({ query: `subscription { ping }` });
 
const subscription = observable.subscribe({
  next: (data) => {
    expect(data).toBe({ data: { ping: 'pong' } });
  },
});
 
// ⏱
 
subscription.unsubscribe();

With Relay

import {
  Network,
  Observable,
  RequestParameters,
  Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';
 
const subscriptionsClient = createClient({
  url: 'http://i.love:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});
 
// yes, both fetch AND subscribe can be handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
  return Observable.create((sink) => {
    if (!operation.text) {
      return sink.error(new Error('Operation text cannot be empty'));
    }
    return subscriptionsClient.subscribe(
      {
        operationName: operation.name,
        query: operation.text,
        variables,
      },
      sink,
    );
  });
}
 
export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);

With urql

import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createSSEClient } from 'graphql-sse';
 
const sseClient = createSSEClient({
  url: 'http://its.urql:4000/graphql/stream',
});
 
export const client = createClient({
  url: '/graphql/stream',
  exchanges: [
    ...defaultExchanges,
    subscriptionExchange({
      forwardSubscription(operation) {
        return {
          subscribe: (sink) => {
            const dispose = sseClient.subscribe(operation, sink);
            return {
              unsubscribe: dispose,
            };
          },
        };
      },
    }),
  ],
});

With Apollo

import {
  ApolloLink,
  Operation,
  FetchResult,
  Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';
 
class SSELink extends ApolloLink {
  private client: Client;
 
  constructor(options: ClientOptions) {
    super();
    this.client = createClient(options);
  }
 
  public request(operation: Operation): Observable<FetchResult> {
    return new Observable((sink) => {
      return this.client.subscribe<FetchResult>(
        { ...operation, query: print(operation.query) },
        {
          next: sink.next.bind(sink),
          complete: sink.complete.bind(sink),
          error: sink.error.bind(sink),
        },
      );
    });
  }
}
 
export const link = new SSELink({
  url: 'http://where.is:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});

With TypedDocumentNode

import {
  createClient,
  Client,
  ClientOptions,
  ExecutionResult,
  Sink,
} from 'graphql-sse';
import { TypedDocumentNode } from '@graphql-typed-document-node/core'; // yarn add @graphql-typed-document-node/core
import { print } from 'graphql';
 
export interface TypedClient
  extends Omit<
    Client,
    'iterate' | 'subscribe' // we're replacing the `iterate` and `subscribe` implementations
  > {
  iterate<
    Result extends ExecutionResult,
    Variables extends Record<string, unknown>,
  >(
    document: TypedDocumentNode<Result, Variables>,
    variables: Variables,
  ): AsyncIterableIterator<Result>;
  subscribe<
    Result extends ExecutionResult,
    Variables extends Record<string, unknown>,
  >(
    document: TypedDocumentNode<Result, Variables>,
    variables: Variables,
    sink: Sink<Result>,
  ): () => void;
}
 
export function createTypedClient(options: ClientOptions): TypedClient {
  const client = createClient(options);
  return {
    ...client,
    iterate: (document, variables) =>
      client.iterate({
        query: print(document),
        variables,
      }) as any,
    subscribe: (document, variables, sink) =>
      client.subscribe(
        {
          query: print(document),
          variables,
        },
        sink,
      ),
  };
}

For HTTP/1 (aka. single connection mode)

import { createClient } from 'graphql-sse';
 
export const client = createClient({
  singleConnection: true, // this is literally it 😄
  url: 'http://use.single:4000/connection/graphql/stream',
  // lazy: true (default) -> connect on first subscribe and disconnect on last unsubscribe
  // lazy: false -> connect as soon as the client is created
});
 
// The client will now run in a "single connection mode" mode. Meaning,
// a single SSE connection will be used to transmit all operation results
// while separate HTTP requests will be issued to dictate the behaviour.

With Custom Retry Timeout Strategy

import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';
 
const url = 'http://i.want.retry:4000/control/graphql/stream';
 
export const client = createClient({
  url,
  retryWait: async function waitForServerHealthyBeforeRetry() {
    // if you have a server healthcheck, you can wait for it to become
    // healthy before retrying after an abrupt disconnect (most commonly a restart)
    await waitForHealthy(url);
 
    // after the server becomes ready, wait for a second + random 1-4s timeout
    // (avoid DDoSing yourself) and try connecting again
    await new Promise((resolve) =>
      setTimeout(resolve, 1000 + Math.random() * 3000),
    );
  },
});

With Logging of Incoming Messages

⚠️

Browsers don’t show them in the DevTools. Read more.

import { createClient } from 'graphql-sse';
 
export const client = createClient({
  url: 'http://let-me-see.messages:4000/graphql/stream',
  onMessage: console.log,
});

In Browser

<!doctype html>
<html>
  <head>
    <meta charset="utf-8" />
    <title>GraphQL over Server-Sent Events</title>
    <script
      type="text/javascript"
      src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
    ></script>
  </head>
  <body>
    <script type="text/javascript">
      const client = graphqlSse.createClient({
        url: 'http://umdfor.the:4000/win/graphql/stream',
      });
 
      // consider other recipes for usage inspiration
    </script>
  </body>
</html>

In Node.js

const ws = require('ws'); // yarn add ws
const fetch = require('node-fetch'); // yarn add node-fetch
const { AbortController } = require('node-abort-controller'); // (node < v15) yarn add node-abort-controller
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');
 
export const client = createClient({
  url: 'http://no.browser:4000/graphql/stream',
  fetchFn: fetch,
  abortControllerImpl: AbortController, // node < v15
  /**
   * Generates a v4 UUID to be used as the ID.
   * Reference: https://gist.github.com/jed/982883
   */
  generateID: () =>
    ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
      (c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
    ),
});

Server Handler Usage

With Custom Authentication

import { createHandler } from 'graphql-sse';
import {
  schema,
  getOrCreateTokenFromCookies,
  customAuthenticationTokenDiscovery,
  processAuthorizationHeader,
} from './my-graphql';
 
export const handler = createHandler({
  schema,
  authenticate: async (req) => {
    let token = req.headers.get('x-graphql-event-stream-token');
    if (token) {
      // When the client is working in a "single connection mode"
      // all subsequent requests for operations will have the
      // stream token set in the `X-GraphQL-Event-Stream-Token` header.
      //
      // It is considered safe to accept the header token always
      // because if a stream reservation does not exist, or is already
      // fulfilled, the handler itself will reject the request.
      //
      // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
      return Array.isArray(token) ? token.join('') : token;
    }
 
    // It is necessary to generate a unique token when dealing with
    // clients that operate in the "single connection mode". The process
    // of generating the token is completely up to the implementor.
    token = getOrCreateTokenFromCookies(req);
    // or
    token = processAuthorizationHeader(req.headers.get('authorization'));
    // or
    token = await customAuthenticationTokenDiscovery(req);
 
    // Using the response argument the implementor may respond to
    // authentication issues however he sees fit.
    if (!token) {
      return [null, { status: 401, statusText: 'Unauthorized' }];
    }
 
    // Clients that operate in "distinct connections mode" dont
    // need a unique stream token. It is completely ok to simply
    // return an empty string for authenticated clients.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode
    if (
      req.method === 'POST' &&
      req.headers.get('accept') === 'text/event-stream'
    ) {
      // "distinct connections mode" requests an event-stream with a POST
      // method. These two checks, together with the lack of `X-GraphQL-Event-Stream-Token`
      // header, are sufficient for accurate detection.
      return ''; // return token; is OK too
    }
 
    // On the other hand, clients operating in "single connection mode"
    // need a unique stream token which will be provided alongside the
    // incoming event stream request inside the `X-GraphQL-Event-Stream-Token` header.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
    return token;
  },
});

With Dynamic Schema

import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';
 
export const handler = createHandler({
  schema: async (req, executionArgsWithoutSchema) => {
    // will be called on every subscribe request
    // allowing you to dynamically supply the schema
    // using the depending on the provided arguments
    const isAdmin = await checkIsAdmin(req);
    if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
    return schema;
  },
});

With Custom Context Value

import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';
 
export const handler = createHandler({
  schema,
  // or static context by supplying the value direcly
  context: (req, args) => {
    return getDynamicContext(req, args);
  },
});

With Custom Execution Arguments

import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';
 
export const handler = createHandler({
  onSubscribe: async (req, params) => {
    const schema = await getSchema(req);
    return {
      schema,
      operationName: params.operationName,
      document:
        typeof params.query === 'string' ? parse(params.query) : params.query,
      variableValues: params.variables,
      contextValue: undefined,
    };
  },
});

With custom subscribe method that gracefully handles thrown errors

graphql-js does not catch errors thrown from async iterables (see issue). This will bubble the error to the server handler and cause issues like writing headers after they’ve already been sent if not handled properly.

Note that graphql-sse does not offer built-in handling of internal errors because there’s no one-glove-fits-all. Errors are important and should be handled with care, exactly to the needs of the user - not the library author.

Therefore, you may instead implement your own subscribe function that gracefully handles thrown errors.

import { subscribe } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema } from './my-graphql';
 
export const handler = createHandler({
  async subscribe(...args) {
    const result = await subscribe(...args);
    if ('next' in result) {
      // is an async iterable, augment the next method to handle thrown errors
      const originalNext = result.next;
      result.next = () =>
        originalNext().catch((err) => ({ value: { errors: [err] } }));
    }
    return result;
  },
});

Server Handler and Client Usage

With Persisted Queries

🛸 server
import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql';
 
// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;
 
const queriesStore: Record<QueryID, ExecutionArgs> = {
  iWantTheGreetings: {
    schema, // you may even provide different schemas in the queries store
    document: parse('subscription Greetings { greetings }'),
  },
};
 
export const handler = createHandler({
  onSubscribe: (_req, params) => {
    const persistedQuery =
      queriesStore[String(params.extensions?.persistedQuery)];
    if (persistedQuery) {
      return {
        ...persistedQuery,
        variableValues: params.variables, // use the variables from the client
        contextValue: undefined,
      };
    }
 
    // for extra security only allow the queries from the store
    return [null, { status: 404, statusText: 'Not Found' }];
  },
});
📺 client
import { createClient } from 'graphql-sse';
 
const client = createClient({
  url: 'http://persisted.graphql:4000/queries',
});
 
(async () => {
  const onNext = () => {
    /**/
  };
 
  await new Promise((resolve, reject) => {
    client.subscribe(
      {
        query: '', // query field is required, but you can leave it empty for persisted queries
        extensions: {
          persistedQuery: 'iWantTheGreetings',
        },
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });
 
  expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();