Recipes

Recipes

Short and concise code snippets for starting with common use-cases.

Client Usage

With Promise (opens in a new tab)

import { createClient, RequestParams } from 'graphql-sse';
 
const client = createClient({
  url: 'http://hey.there:4000/graphql/stream',
});
 
export async function execute<T>(payload: RequestParams) {
  return new Promise<T>((resolve, reject) => {
    let result: T;
    client.subscribe<T>(payload, {
      next: (data) => (result = data),
      error: reject,
      complete: () => resolve(result),
    });
  });
}
 
// use
(async () => {
  try {
    const result = await execute({
      query: '{ hello }',
    });
    // complete
    // next = result = { data: { hello: 'Hello World!' } }
  } catch (err) {
    // error
  }
})();

With AsyncIterator (opens in a new tab)

import { createClient, RequestParams } from 'graphql-sse';
 
const client = createClient({
  url: 'http://iterators.ftw:4000/graphql/stream',
});
 
export function subscribe(payload) {
  let deferred = null;
  const pending = [];
  let throwMe = null,
    done = false;
  const dispose = client.subscribe(payload, {
    next: (data) => {
      pending.push(data);
      deferred?.resolve(false);
    },
    error: (err) => {
      throwMe = err;
      deferred?.reject(throwMe);
    },
    complete: () => {
      done = true;
      deferred?.resolve(true);
    },
  });
  return {
    [Symbol.asyncIterator]() {
      return this;
    },
    async next() {
      if (done) return { done: true, value: undefined };
      if (throwMe) throw throwMe;
      if (pending.length) return { value: pending.shift() };
      return (await new Promise(
        (resolve, reject) => (deferred = { resolve, reject }),
      ))
        ? { done: true, value: undefined }
        : { value: pending.shift() };
    },
    async throw(err) {
      throwMe = err;
      deferred?.reject(throwMe);
      return { done: true, value: undefined };
    },
    async return() {
      done = true;
      deferred?.resolve(true);
      dispose();
      return { done: true, value: undefined };
    },
  };
}
 
(async () => {
  const subscription = subscribe({
    query: 'subscription { greetings }',
  });
  // subscription.return() to dispose
 
  for await (const result of subscription) {
    // next = result = { data: { greetings: 5x } }
  }
  // complete
})();

With Observable (opens in a new tab)

import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable
 
const client = createClient({
  url: 'http://graphql.loves:4000/observables',
});
 
export function toObservable(operation) {
  return new Observable((observer) =>
    client.subscribe(operation, {
      next: (data) => observer.next(data),
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    }),
  );
}
 
const observable = toObservable({ query: `subscription { ping }` });
 
const subscription = observable.subscribe({
  next: (data) => {
    expect(data).toBe({ data: { ping: 'pong' } });
  },
});
 
// ⏱
 
subscription.unsubscribe();

With Relay (opens in a new tab)

import {
  Network,
  Observable,
  RequestParameters,
  Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';
 
const subscriptionsClient = createClient({
  url: 'http://i.love:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});
 
// yes, both fetch AND subscribe can be handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
  return Observable.create((sink) => {
    if (!operation.text) {
      return sink.error(new Error('Operation text cannot be empty'));
    }
    return subscriptionsClient.subscribe(
      {
        operationName: operation.name,
        query: operation.text,
        variables,
      },
      sink,
    );
  });
}
 
export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);

With urql (opens in a new tab)

import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createSSEClient } from 'graphql-sse';
 
const sseClient = createSSEClient({
  url: 'http://its.urql:4000/graphql/stream',
});
 
export const client = createClient({
  url: '/graphql/stream',
  exchanges: [
    ...defaultExchanges,
    subscriptionExchange({
      forwardSubscription(operation) {
        return {
          subscribe: (sink) => {
            const dispose = sseClient.subscribe(operation, sink);
            return {
              unsubscribe: dispose,
            };
          },
        };
      },
    }),
  ],
});

With Apollo (opens in a new tab)

import {
  ApolloLink,
  Operation,
  FetchResult,
  Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';
 
class SSELink extends ApolloLink {
  private client: Client;
 
  constructor(options: ClientOptions) {
    super();
    this.client = createClient(options);
  }
 
  public request(operation: Operation): Observable<FetchResult> {
    return new Observable((sink) => {
      return this.client.subscribe<FetchResult>(
        { ...operation, query: print(operation.query) },
        {
          next: sink.next.bind(sink),
          complete: sink.complete.bind(sink),
          error: sink.error.bind(sink),
        },
      );
    });
  }
}
 
export const link = new SSELink({
  url: 'http://where.is:4000/graphql/stream',
  headers: () => {
    const session = getSession();
    if (!session) return {};
    return {
      Authorization: `Bearer ${session.token}`,
    };
  },
});

For HTTP/1 (aka. single connection mode (opens in a new tab))

import { createClient } from 'graphql-sse';
 
export const client = createClient({
  singleConnection: true, // this is literally it 😄
  url: 'http://use.single:4000/connection/graphql/stream',
  // lazy: true (default) -> connect on first subscribe and disconnect on last unsubscribe
  // lazy: false -> connect as soon as the client is created
});
 
// The client will now run in a "single connection mode" mode. Meaning,
// a single SSE connection will be used to transmit all operation results
// while separate HTTP requests will be issued to dictate the behaviour.

With Custom Retry Timeout Strategy

import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';
 
const url = 'http://i.want.retry:4000/control/graphql/stream';
 
export const client = createClient({
  url,
  retryWait: async function waitForServerHealthyBeforeRetry() {
    // if you have a server healthcheck, you can wait for it to become
    // healthy before retrying after an abrupt disconnect (most commonly a restart)
    await waitForHealthy(url);
 
    // after the server becomes ready, wait for a second + random 1-4s timeout
    // (avoid DDoSing yourself) and try connecting again
    await new Promise((resolve) =>
      setTimeout(resolve, 1000 + Math.random() * 3000),
    );
  },
});

With Logging of Incoming Messages

⚠️

Browsers don't show them in the DevTools. Read more (opens in a new tab).

import { createClient } from 'graphql-sse';
 
export const client = createClient({
  url: 'http://let-me-see.messages:4000/graphql/stream',
  onMessage: console.log,
});

In Browser

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8" />
    <title>GraphQL over Server-Sent Events</title>
    <script
      type="text/javascript"
      src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
    ></script>
  </head>
  <body>
    <script type="text/javascript">
      const client = graphqlSse.createClient({
        url: 'http://umdfor.the:4000/win/graphql/stream',
      });
 
      // consider other recipes for usage inspiration
    </script>
  </body>
</html>

In Node.js

const ws = require('ws'); // yarn add ws
const fetch = require('node-fetch'); // yarn add node-fetch
const { AbortController } = require('node-abort-controller'); // (node < v15) yarn add node-abort-controller
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');
 
export const client = createClient({
  url: 'http://no.browser:4000/graphql/stream',
  fetchFn: fetch,
  abortControllerImpl: AbortController, // node < v15
  /**
   * Generates a v4 UUID to be used as the ID.
   * Reference: https://gist.github.com/jed/982883
   */
  generateID: () =>
    ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
      (c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
    ),
});

Server Handler Usage

With Custom Authentication

import { createHandler } from 'graphql-sse';
import {
  schema,
  getOrCreateTokenFromCookies,
  customAuthenticationTokenDiscovery,
  processAuthorizationHeader,
} from './my-graphql';
 
export const handler = createHandler({
  schema,
  authenticate: async (req) => {
    let token = req.headers.get('x-graphql-event-stream-token');
    if (token) {
      // When the client is working in a "single connection mode"
      // all subsequent requests for operations will have the
      // stream token set in the `X-GraphQL-Event-Stream-Token` header.
      //
      // It is considered safe to accept the header token always
      // because if a stream reservation does not exist, or is already
      // fulfilled, the handler itself will reject the request.
      //
      // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
      return Array.isArray(token) ? token.join('') : token;
    }
 
    // It is necessary to generate a unique token when dealing with
    // clients that operate in the "single connection mode". The process
    // of generating the token is completely up to the implementor.
    token = getOrCreateTokenFromCookies(req);
    // or
    token = processAuthorizationHeader(req.headers.get('authorization'));
    // or
    token = await customAuthenticationTokenDiscovery(req);
 
    // Using the response argument the implementor may respond to
    // authentication issues however he sees fit.
    if (!token) {
      return [null, { status: 401, statusText: 'Unauthorized' }];
    }
 
    // Clients that operate in "distinct connections mode" dont
    // need a unique stream token. It is completely ok to simply
    // return an empty string for authenticated clients.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode
    if (
      req.method === 'POST' &&
      req.headers.get('accept') === 'text/event-stream'
    ) {
      // "distinct connections mode" requests an event-stream with a POST
      // method. These two checks, together with the lack of `X-GraphQL-Event-Stream-Token`
      // header, are sufficient for accurate detection.
      return ''; // return token; is OK too
    }
 
    // On the other hand, clients operating in "single connection mode"
    // need a unique stream token which will be provided alongside the
    // incoming event stream request inside the `X-GraphQL-Event-Stream-Token` header.
    //
    // Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
    return token;
  },
});

With Dynamic Schema

import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';
 
export const handler = createHandler({
  schema: async (req, executionArgsWithoutSchema) => {
    // will be called on every subscribe request
    // allowing you to dynamically supply the schema
    // using the depending on the provided arguments
    const isAdmin = await checkIsAdmin(req);
    if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
    return schema;
  },
});

With Custom Context Value

import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';
 
export const handler = createHandler({
  schema,
  // or static context by supplying the value direcly
  context: (req, args) => {
    return getDynamicContext(req, args);
  },
});

With Custom Execution Arguments

import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';
 
export const handler = createHandler({
  onSubscribe: async (req, params) => {
    const schema = await getSchema(req);
    return {
      schema,
      operationName: params.operationName,
      document:
        typeof params.query === 'string' ? parse(params.query) : params.query,
      variableValues: params.variables,
      contextValue: undefined,
    };
  },
});

Server Handler and Client Usage

With Persisted Queries

🛸 server
import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql';
 
// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;
 
const queriesStore: Record<QueryID, ExecutionArgs> = {
  iWantTheGreetings: {
    schema, // you may even provide different schemas in the queries store
    document: parse('subscription Greetings { greetings }'),
  },
};
 
export const handler = createHandler({
  onSubscribe: (_req, params) => {
    const persistedQuery =
      queriesStore[String(params.extensions?.persistedQuery)];
    if (persistedQuery) {
      return {
        ...persistedQuery,
        variableValues: params.variables, // use the variables from the client
        contextValue: undefined,
      };
    }
 
    // for extra security only allow the queries from the store
    return [null, { status: 404, statusText: 'Not Found' }];
  },
});
📺 client
import { createClient } from 'graphql-sse';
 
const client = createClient({
  url: 'http://persisted.graphql:4000/queries',
});
 
(async () => {
  const onNext = () => {
    /**/
  };
 
  await new Promise((resolve, reject) => {
    client.subscribe(
      {
        query: '', // query field is required, but you can leave it empty for persisted queries
        extensions: {
          persistedQuery: 'iWantTheGreetings',
        },
      },
      {
        next: onNext,
        error: reject,
        complete: resolve,
      },
    );
  });
 
  expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();