Recipes
Short and concise code snippets for starting with common use-cases.
Client Usage
With EventSource (distinct connections mode)
The following example doesn’t use the graphql-sse
client and works only for the “distinct connections mode” of the GraphQL over SSE spec. This mode is the client default and should suffice for most of the use-cases.
const url = new URL('http://localhost:4000/graphql/stream');
url.searchParams.append('query', 'subscription { greetings }');
const source = new EventSource(url);
source.addEventListener('next', ({ data }) => {
console.log(data);
// {"data":{"greetings":"Hi"}}
// {"data":{"greetings":"Bonjour"}}
// {"data":{"greetings":"Hola"}}
// {"data":{"greetings":"Ciao"}}
// {"data":{"greetings":"Zdravo"}}
});
source.addEventListener('error', (e) => {
console.error(e);
});
source.addEventListener('complete', () => {
source.close();
});
With Promise
import { createClient, RequestParams } from 'graphql-sse';
const client = createClient({
url: 'http://hey.there:4000/graphql/stream',
});
(async () => {
const query = client.iterate({
query: '{ hello }',
});
try {
const { value } = await query.next();
// next = value = { data: { hello: 'Hello World!' } }
// complete
} catch (err) {
// error
}
})();
With AsyncIterator
import { createClient, RequestParams } from 'graphql-sse';
const client = createClient({
url: 'http://iterators.ftw:4000/graphql/stream',
});
(async () => {
const subscription = client.iterate({
query: 'subscription { greetings }',
});
// "subscription.return()" to dispose
for await (const result of subscription) {
// next = result = { data: { greetings: 5x } }
// "break" to dispose
}
// complete
})();
With Observable
import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable
const client = createClient({
url: 'http://graphql.loves:4000/observables',
});
export function toObservable(operation) {
return new Observable((observer) =>
client.subscribe(operation, {
next: (data) => observer.next(data),
error: (err) => observer.error(err),
complete: () => observer.complete(),
}),
);
}
const observable = toObservable({ query: `subscription { ping }` });
const subscription = observable.subscribe({
next: (data) => {
expect(data).toBe({ data: { ping: 'pong' } });
},
});
// ⏱
subscription.unsubscribe();
With Relay
import {
Network,
Observable,
RequestParameters,
Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';
const subscriptionsClient = createClient({
url: 'http://i.love:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
// yes, both fetch AND subscribe can be handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
return Observable.create((sink) => {
if (!operation.text) {
return sink.error(new Error('Operation text cannot be empty'));
}
return subscriptionsClient.subscribe(
{
operationName: operation.name,
query: operation.text,
variables,
},
sink,
);
});
}
export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
With urql
import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createSSEClient } from 'graphql-sse';
const sseClient = createSSEClient({
url: 'http://its.urql:4000/graphql/stream',
});
export const client = createClient({
url: '/graphql/stream',
exchanges: [
...defaultExchanges,
subscriptionExchange({
forwardSubscription(operation) {
return {
subscribe: (sink) => {
const dispose = sseClient.subscribe(operation, sink);
return {
unsubscribe: dispose,
};
},
};
},
}),
],
});
With Apollo
import {
ApolloLink,
Operation,
FetchResult,
Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';
class SSELink extends ApolloLink {
private client: Client;
constructor(options: ClientOptions) {
super();
this.client = createClient(options);
}
public request(operation: Operation): Observable<FetchResult> {
return new Observable((sink) => {
return this.client.subscribe<FetchResult>(
{ ...operation, query: print(operation.query) },
{
next: sink.next.bind(sink),
complete: sink.complete.bind(sink),
error: sink.error.bind(sink),
},
);
});
}
}
export const link = new SSELink({
url: 'http://where.is:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
With TypedDocumentNode
import {
createClient,
Client,
ClientOptions,
ExecutionResult,
Sink,
} from 'graphql-sse';
import { TypedDocumentNode } from '@graphql-typed-document-node/core'; // yarn add @graphql-typed-document-node/core
import { print } from 'graphql';
export interface TypedClient
extends Omit<
Client,
'iterate' | 'subscribe' // we're replacing the `iterate` and `subscribe` implementations
> {
iterate<
Result extends ExecutionResult,
Variables extends Record<string, unknown>,
>(
document: TypedDocumentNode<Result, Variables>,
variables: Variables,
): AsyncIterableIterator<Result>;
subscribe<
Result extends ExecutionResult,
Variables extends Record<string, unknown>,
>(
document: TypedDocumentNode<Result, Variables>,
variables: Variables,
sink: Sink<Result>,
): () => void;
}
export function createTypedClient(options: ClientOptions): TypedClient {
const client = createClient(options);
return {
...client,
iterate: (document, variables) =>
client.iterate({
query: print(document),
variables,
}) as any,
subscribe: (document, variables, sink) =>
client.subscribe(
{
query: print(document),
variables,
},
sink,
),
};
}
For HTTP/1 (aka. single connection mode)
import { createClient } from 'graphql-sse';
export const client = createClient({
singleConnection: true, // this is literally it 😄
url: 'http://use.single:4000/connection/graphql/stream',
// lazy: true (default) -> connect on first subscribe and disconnect on last unsubscribe
// lazy: false -> connect as soon as the client is created
});
// The client will now run in a "single connection mode" mode. Meaning,
// a single SSE connection will be used to transmit all operation results
// while separate HTTP requests will be issued to dictate the behaviour.
With Custom Retry Timeout Strategy
import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';
const url = 'http://i.want.retry:4000/control/graphql/stream';
export const client = createClient({
url,
retryWait: async function waitForServerHealthyBeforeRetry() {
// if you have a server healthcheck, you can wait for it to become
// healthy before retrying after an abrupt disconnect (most commonly a restart)
await waitForHealthy(url);
// after the server becomes ready, wait for a second + random 1-4s timeout
// (avoid DDoSing yourself) and try connecting again
await new Promise((resolve) =>
setTimeout(resolve, 1000 + Math.random() * 3000),
);
},
});
With Logging of Incoming Messages
Browsers don’t show them in the DevTools. Read more.
import { createClient } from 'graphql-sse';
export const client = createClient({
url: 'http://let-me-see.messages:4000/graphql/stream',
onMessage: console.log,
});
In Browser
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>GraphQL over Server-Sent Events</title>
<script
type="text/javascript"
src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
></script>
</head>
<body>
<script type="text/javascript">
const client = graphqlSse.createClient({
url: 'http://umdfor.the:4000/win/graphql/stream',
});
// consider other recipes for usage inspiration
</script>
</body>
</html>
In Node.js
const ws = require('ws'); // yarn add ws
const fetch = require('node-fetch'); // yarn add node-fetch
const { AbortController } = require('node-abort-controller'); // (node < v15) yarn add node-abort-controller
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');
export const client = createClient({
url: 'http://no.browser:4000/graphql/stream',
fetchFn: fetch,
abortControllerImpl: AbortController, // node < v15
/**
* Generates a v4 UUID to be used as the ID.
* Reference: https://gist.github.com/jed/982883
*/
generateID: () =>
([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
(c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
),
});
Server Handler Usage
With Custom Authentication
import { createHandler } from 'graphql-sse';
import {
schema,
getOrCreateTokenFromCookies,
customAuthenticationTokenDiscovery,
processAuthorizationHeader,
} from './my-graphql';
export const handler = createHandler({
schema,
authenticate: async (req) => {
let token = req.headers.get('x-graphql-event-stream-token');
if (token) {
// When the client is working in a "single connection mode"
// all subsequent requests for operations will have the
// stream token set in the `X-GraphQL-Event-Stream-Token` header.
//
// It is considered safe to accept the header token always
// because if a stream reservation does not exist, or is already
// fulfilled, the handler itself will reject the request.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
return Array.isArray(token) ? token.join('') : token;
}
// It is necessary to generate a unique token when dealing with
// clients that operate in the "single connection mode". The process
// of generating the token is completely up to the implementor.
token = getOrCreateTokenFromCookies(req);
// or
token = processAuthorizationHeader(req.headers.get('authorization'));
// or
token = await customAuthenticationTokenDiscovery(req);
// Using the response argument the implementor may respond to
// authentication issues however he sees fit.
if (!token) {
return [null, { status: 401, statusText: 'Unauthorized' }];
}
// Clients that operate in "distinct connections mode" dont
// need a unique stream token. It is completely ok to simply
// return an empty string for authenticated clients.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode
if (
req.method === 'POST' &&
req.headers.get('accept') === 'text/event-stream'
) {
// "distinct connections mode" requests an event-stream with a POST
// method. These two checks, together with the lack of `X-GraphQL-Event-Stream-Token`
// header, are sufficient for accurate detection.
return ''; // return token; is OK too
}
// On the other hand, clients operating in "single connection mode"
// need a unique stream token which will be provided alongside the
// incoming event stream request inside the `X-GraphQL-Event-Stream-Token` header.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
return token;
},
});
With Dynamic Schema
import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';
export const handler = createHandler({
schema: async (req, executionArgsWithoutSchema) => {
// will be called on every subscribe request
// allowing you to dynamically supply the schema
// using the depending on the provided arguments
const isAdmin = await checkIsAdmin(req);
if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
return schema;
},
});
With Custom Context Value
import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';
export const handler = createHandler({
schema,
// or static context by supplying the value direcly
context: (req, args) => {
return getDynamicContext(req, args);
},
});
With Custom Execution Arguments
import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';
export const handler = createHandler({
onSubscribe: async (req, params) => {
const schema = await getSchema(req);
return {
schema,
operationName: params.operationName,
document:
typeof params.query === 'string' ? parse(params.query) : params.query,
variableValues: params.variables,
contextValue: undefined,
};
},
});
With custom subscribe method that gracefully handles thrown errors
graphql-js
does not catch errors thrown from async iterables (see issue). This will bubble the error to the server handler and cause issues like writing headers after they’ve already been sent if not handled properly.
Note that graphql-sse
does not offer built-in handling of internal errors because there’s no one-glove-fits-all. Errors are important and should be handled with care, exactly to the needs of the user - not the library author.
Therefore, you may instead implement your own subscribe
function that gracefully handles thrown errors.
import { subscribe } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema } from './my-graphql';
export const handler = createHandler({
async subscribe(...args) {
const result = await subscribe(...args);
if ('next' in result) {
// is an async iterable, augment the next method to handle thrown errors
const originalNext = result.next;
result.next = () =>
originalNext().catch((err) => ({ value: { errors: [err] } }));
}
return result;
},
});
Server Handler and Client Usage
With Persisted Queries
import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql';
// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;
const queriesStore: Record<QueryID, ExecutionArgs> = {
iWantTheGreetings: {
schema, // you may even provide different schemas in the queries store
document: parse('subscription Greetings { greetings }'),
},
};
export const handler = createHandler({
onSubscribe: (_req, params) => {
const persistedQuery =
queriesStore[String(params.extensions?.persistedQuery)];
if (persistedQuery) {
return {
...persistedQuery,
variableValues: params.variables, // use the variables from the client
contextValue: undefined,
};
}
// for extra security only allow the queries from the store
return [null, { status: 404, statusText: 'Not Found' }];
},
});
import { createClient } from 'graphql-sse';
const client = createClient({
url: 'http://persisted.graphql:4000/queries',
});
(async () => {
const onNext = () => {
/**/
};
await new Promise((resolve, reject) => {
client.subscribe(
{
query: '', // query field is required, but you can leave it empty for persisted queries
extensions: {
persistedQuery: 'iWantTheGreetings',
},
},
{
next: onNext,
error: reject,
complete: resolve,
},
);
});
expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();