Catch the highlights of GraphQLConf 2023! Click for recordings. Or check out our recap blog post.
Recipes

Recipes

Short and concise code snippets for starting with common use-cases.

Client Usage

With EventSource (distinct connections mode)

The following example doesn't use the graphql-sse client and works only for the "distinct connections mode" of the GraphQL over SSE spec. This mode is the client default and should suffice for most of the use-cases.

const url = new URL('http://localhost:4000/graphql/stream');
url.searchParams.append('query', 'subscription { greetings }');
 
const source = new EventSource(url);
 
source.addEventListener('next', ({ data }) => {
  console.log(data);
  // {"data":{"greetings":"Hi"}}
  // {"data":{"greetings":"Bonjour"}}
  // {"data":{"greetings":"Hola"}}
  // {"data":{"greetings":"Ciao"}}
  // {"data":{"greetings":"Zdravo"}}
});
 
source.addEventListener('error', (e) => {
  console.error(e);
});
 
source.addEventListener('complete', () => {
  source.close();
});

With Promise

import { createClient, RequestParams } from 'graphql-sse';
 
const client = createClient({
  url: 'http://hey.there:4000/graphql/stream',
});
 
(async () => {
  const query = client.iterate({
    query: '{ hello }',
  });
 
  try {
    const { value } = await query.next();
    // next = value = { data: { hello: 'Hello World!' } }
    // complete
  } catch (err) {
    // error
  }
})();

With AsyncIterator

import { createClient, RequestParams } from 'graphql-sse';
 
const client = createClient({
  url: 'http://iterators.ftw:4000/graphql/stream',
});
 
(async () => {
  const subscription = client.iterate({
    query: 'subscription { greetings }',
  });
  // "subscription.return()" to dispose
 
  for await (const result of subscription) {
    // next = result = { data: { greetings: 5x } }
    // "break" to dispose
  }
  // complete
})();

With Observable

import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable
 
const client = createClient({
  url: 'http://graphql.loves:4000/observables',
});
 
export function toObservable(operation) {
  return new Observable((observer) =>
    client.subscribe(operation, {
      next: (data) => observer.next(data),
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    }),
  );
}
 
const observable = toObservable({ query: `subscription { ping }` });
 
const subscription = observable.subscribe({
  next: (data) => {
    expect(data).toBe({ data: { ping: 'pong' } });
  },
});
 
// ⏱
 
subscription.unsubscribe();

With Relay

import {
  Network,
  Observable,
  RequestParameters,
  Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';
 
const subscriptionsClient = createClient({
  url: 'http://i.love:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});
 
// yes, both fetch AND subscribe can be handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
  return Observable.create((sink) => {
    if (!operation.text) {
      return sink.error(new Error('Operation text cannot be empty'));
    }
    return subscriptionsClient.subscribe(
      {
        operationName: operation.name,
        query: operation.text,
        variables,
      },
      sink,
    );
  });
}
 
export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);

With urql

import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createSSEClient } from 'graphql-sse';
 
const sseClient = createSSEClient({
  url: 'http://its.urql:4000/graphql/stream',
});
 
export const client = createClient({
  url: '/graphql/stream',
  exchanges: [
    ...defaultExchanges,
    subscriptionExchange({
      forwardSubscription(operation) {
        return {
          subscribe: (sink) => {
            const dispose = sseClient.subscribe(operation, sink);
            return {
              unsubscribe: dispose,
            };
          },
        };
      },
    }),
  ],
});

With Apollo

import {
  ApolloLink,
  Operation,
  FetchResult,
  Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';
 
class SSELink extends ApolloLink {
  private client: Client;
 
  constructor(options: ClientOptions) {
    super();
    this.client = createClient(options);
  }
 
  public request(operation: Operation): Observable<FetchResult> {
    return new Observable((sink) => {
      return this.client.subscribe<FetchResult>(
        { ...operation, query: print(operation.query) },
        {
          next: sink.next.bind(sink),
          complete: sink.complete.bind(sink),
          error: sink.error.bind(sink),
        },
      );
    });
  }
}
 
export const link = new SSELink({
  url: 'http://where.is:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});

With TypedDocumentNode

import {
  createClient,
  Client,
  ClientOptions,
  ExecutionResult,
  Sink,
} from 'graphql-sse';
import { TypedDocumentNode } from '@graphql-typed-document-node/core'; // yarn add @graphql-typed-document-node/core
import { print } from 'graphql';
 
export interface TypedClient
  extends Omit<
    Client,
    'iterate' | 'subscribe' // we're replacing the `iterate` and `subscribe` implementations
  > {
  iterate<
    Result extends ExecutionResult,
    Variables extends Record<string, unknown>,
  >(
    document: TypedDocumentNode<Result, Variables>,
    variables: Variables,
  ): AsyncIterableIterator<Result>;
  subscribe<
    Result extends ExecutionResult,
    Variables extends Record<string, unknown>,
  >(
    document: TypedDocumentNode<Result, Variables>,
    variables: Variables,
    sink: Sink<Result>,
  ): () => void;
}
 
export function createTypedClient(options: ClientOptions): TypedClient {
  const client = createClient(options);
  return {
    ...client,
    iterate: (document, variables) =>
      client.iterate({
        query: print(document),
        variables,
      }) as any,
    subscribe: (document, variables, sink) =>
      client.subscribe(
        {
          query: print(document),
          variables,
        },
        sink,
      ),
  };
}

For HTTP/1 (aka. single connection mode)

import { createClient } from 'graphql-sse';
 
export const client = createClient({
  singleConnection: true, // this is literally it 😄
  url: 'http://use.single:4000/connection/graphql/stream',
  // lazy: true (default) -> connect on first subscribe and disconnect on last unsubscribe
  // lazy: false -> connect as soon as the client is created
});
 
// The client will now run in a "single connection mode" mode. Meaning,
// a single SSE connection will be used to transmit all operation results
// while separate HTTP requests will be issued to dictate the behaviour.

With Custom Retry Timeout Strategy

import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';
 
const url = 'http://i.want.retry:4000/control/graphql/stream';
 
export const client = createClient({
  url,
  retryWait: async function waitForServerHealthyBeforeRetry() {
    // if you have a server healthcheck, you can wait for it to become
    // healthy before retrying after an abrupt disconnect (most commonly a restart)
    await waitForHealthy(url);
 
    // after the server becomes ready, wait for a second + random 1-4s timeout
    // (avoid DDoSing yourself) and try connecting again
    await new Promise((resolve) =>
      setTimeout(resolve, 1000 + Math.random() * 3000),
    );
  },
});

With Logging of Incoming Messages

⚠️

Browsers don't show them in the DevTools. Read more.

import { createClient } from 'graphql-sse';
 
export const client = createClient({
  url: 'http://let-me-see.messages:4000/graphql/stream',
  onMessage: console.log,
});

In Browser

<!doctype html>
<html>
  <head>
    <meta charset="utf-8" />
    <title>GraphQL over Server-Sent Events</title>
    <script
      type="text/javascript"
      src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
    ></script>
  </head>
  <body>
    <script type="text/javascript">
      const client = graphqlSse.createClient({
        url: 'http://umdfor.the:4000/win/graphql/stream',
      });
 
      // consider other recipes for usage inspiration
    </script>
  </body>
</html>

In Node.js

const ws = require('ws'); // yarn add ws
const fetch = require('node-fetch'); // yarn add node-fetch
const { AbortController } = require('node-abort-controller'); // (node < v15) yarn add node-abort-controller
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');
 
export const client = createClient({
  url: 'http://no.browser:4000/graphql/stream',
  fetchFn: fetch,
  abortControllerImpl: AbortController, // node < v15
  /**
   * Generates a v4 UUID to be used as the ID.
   * Reference: https://gist.github.com/jed/982883
   */
  generateID: () =>
    ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
      (c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
    ),
});

Server Handler Usage

With Custom Authentication

import { createHandler } from 'graphql-sse';
import {
  schema,
  getOrCreateTokenFromCookies,
  customAuthenticationTokenDiscovery,
  processAuthorizationHeader,
} from './my-graphql';
 
export const handler = createHandler({
  schema,
  authenticate: async (req) => {
    let token = req.headers.get('x-graphql-event-stream-token');
    if (token) {
      // When the client is working in a "single connection mode"
      // all subsequent requests for operations will have the
      // stream token set in the `X-GraphQL-Event-Stream-Token` header.
      //
      // It is considered safe to accept the header token always
      // because if a stream reservation does not exist, or is already
      // fulfilled, the handler itself will reject the request.
      //
      // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
      return Array.isArray(token) ? token.join('') : token;
    }
 
    // It is necessary to generate a unique token when dealing with
    // clients that operate in the "single connection mode". The process
    // of generating the token is completely up to the implementor.
    token = getOrCreateTokenFromCookies(req);
    // or
    token = processAuthorizationHeader(req.headers.get('authorization'));
    // or
    token = await customAuthenticationTokenDiscovery(req);
 
    // Using the response argument the implementor may respond to
    // authentication issues however he sees fit.
    if (!token) {
      return [null, { status: 401, statusText: 'Unauthorized' }];
    }
 
    // Clients that operate in "distinct connections mode" dont
    // need a unique stream token. It is completely ok to simply
    // return an empty string for authenticated clients.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode
    if (
      req.method === 'POST' &&
      req.headers.get('accept') === 'text/event-stream'
    ) {
      // "distinct connections mode" requests an event-stream with a POST
      // method. These two checks, together with the lack of `X-GraphQL-Event-Stream-Token`
      // header, are sufficient for accurate detection.
      return ''; // return token; is OK too
    }
 
    // On the other hand, clients operating in "single connection mode"
    // need a unique stream token which will be provided alongside the
    // incoming event stream request inside the `X-GraphQL-Event-Stream-Token` header.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
    return token;
  },
});

With Dynamic Schema

import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';
 
export const handler = createHandler({
  schema: async (req, executionArgsWithoutSchema) => {
    // will be called on every subscribe request
    // allowing you to dynamically supply the schema
    // using the depending on the provided arguments
    const isAdmin = await checkIsAdmin(req);
    if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
    return schema;
  },
});

With Custom Context Value

import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';
 
export const handler = createHandler({
  schema,
  // or static context by supplying the value direcly
  context: (req, args) => {
    return getDynamicContext(req, args);
  },
});

With Custom Execution Arguments

import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';
 
export const handler = createHandler({
  onSubscribe: async (req, params) => {
    const schema = await getSchema(req);
    return {
      schema,
      operationName: params.operationName,
      document:
        typeof params.query === 'string' ? parse(params.query) : params.query,
      variableValues: params.variables,
      contextValue: undefined,
    };
  },
});

Server Handler and Client Usage

With Persisted Queries

🛸 server
import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql';
 
// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;
 
const queriesStore: Record<QueryID, ExecutionArgs> = {
  iWantTheGreetings: {
    schema, // you may even provide different schemas in the queries store
    document: parse('subscription Greetings { greetings }'),
  },
};
 
export const handler = createHandler({
  onSubscribe: (_req, params) => {
    const persistedQuery =
      queriesStore[String(params.extensions?.persistedQuery)];
    if (persistedQuery) {
      return {
        ...persistedQuery,
        variableValues: params.variables, // use the variables from the client
        contextValue: undefined,
      };
    }
 
    // for extra security only allow the queries from the store
    return [null, { status: 404, statusText: 'Not Found' }];
  },
});
📺 client
import { createClient } from 'graphql-sse';
 
const client = createClient({
  url: 'http://persisted.graphql:4000/queries',
});
 
(async () => {
  const onNext = () => {
    /**/
  };
 
  await new Promise((resolve, reject) => {
    client.subscribe(
      {
        query: '', // query field is required, but you can leave it empty for persisted queries
        extensions: {
          persistedQuery: 'iWantTheGreetings',
        },
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });
 
  expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();