Recipes
Short and concise code snippets for starting with common use-cases.
Client Usage
With EventSource (distinct connections mode)
The following example doesn't use the graphql-sse
client and works only for the "distinct connections mode" of the GraphQL over SSE spec. This mode is the client default and should suffice for most of the use-cases.
const url = new URL('http://localhost:4000/graphql/stream');
url.searchParams.append('query', 'subscription { greetings }');
const source = new EventSource(url);
source.addEventListener('next', ({ data }) => {
console.log(data);
// {"data":{"greetings":"Hi"}}
// {"data":{"greetings":"Bonjour"}}
// {"data":{"greetings":"Hola"}}
// {"data":{"greetings":"Ciao"}}
// {"data":{"greetings":"Zdravo"}}
});
source.addEventListener('error', (e) => {
console.error(e);
});
source.addEventListener('complete', () => {
source.close();
});
With Promise
import { createClient, RequestParams } from 'graphql-sse';
const client = createClient({
url: 'http://hey.there:4000/graphql/stream',
});
(async () => {
const query = client.iterate({
query: '{ hello }',
});
try {
const { value } = await query.next();
// next = value = { data: { hello: 'Hello World!' } }
// complete
} catch (err) {
// error
}
})();
With AsyncIterator
import { createClient, RequestParams } from 'graphql-sse';
const client = createClient({
url: 'http://iterators.ftw:4000/graphql/stream',
});
(async () => {
const subscription = client.iterate({
query: 'subscription { greetings }',
});
// "subscription.return()" to dispose
for await (const result of subscription) {
// next = result = { data: { greetings: 5x } }
// "break" to dispose
}
// complete
})();
With Observable
import { Observable } from 'relay-runtime';
// or
import { Observable } from '@apollo/client/core';
// or
import { Observable } from 'rxjs';
// or
import Observable from 'zen-observable';
// or any other lib which implements Observables as per the ECMAScript proposal: https://github.com/tc39/proposal-observable
const client = createClient({
url: 'http://graphql.loves:4000/observables',
});
export function toObservable(operation) {
return new Observable((observer) =>
client.subscribe(operation, {
next: (data) => observer.next(data),
error: (err) => observer.error(err),
complete: () => observer.complete(),
}),
);
}
const observable = toObservable({ query: `subscription { ping }` });
const subscription = observable.subscribe({
next: (data) => {
expect(data).toBe({ data: { ping: 'pong' } });
},
});
// ⏱
subscription.unsubscribe();
With Relay
import {
Network,
Observable,
RequestParameters,
Variables,
} from 'relay-runtime';
import { createClient } from 'graphql-sse';
const subscriptionsClient = createClient({
url: 'http://i.love:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
// yes, both fetch AND subscribe can be handled in one implementation
function fetchOrSubscribe(operation: RequestParameters, variables: Variables) {
return Observable.create((sink) => {
if (!operation.text) {
return sink.error(new Error('Operation text cannot be empty'));
}
return subscriptionsClient.subscribe(
{
operationName: operation.name,
query: operation.text,
variables,
},
sink,
);
});
}
export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
With urql
import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
import { createClient as createSSEClient } from 'graphql-sse';
const sseClient = createSSEClient({
url: 'http://its.urql:4000/graphql/stream',
});
export const client = createClient({
url: '/graphql/stream',
exchanges: [
...defaultExchanges,
subscriptionExchange({
forwardSubscription(operation) {
return {
subscribe: (sink) => {
const dispose = sseClient.subscribe(operation, sink);
return {
unsubscribe: dispose,
};
},
};
},
}),
],
});
With Apollo
import {
ApolloLink,
Operation,
FetchResult,
Observable,
} from '@apollo/client/core';
import { print, GraphQLError } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';
class SSELink extends ApolloLink {
private client: Client;
constructor(options: ClientOptions) {
super();
this.client = createClient(options);
}
public request(operation: Operation): Observable<FetchResult> {
return new Observable((sink) => {
return this.client.subscribe<FetchResult>(
{ ...operation, query: print(operation.query) },
{
next: sink.next.bind(sink),
complete: sink.complete.bind(sink),
error: sink.error.bind(sink),
},
);
});
}
}
export const link = new SSELink({
url: 'http://where.is:4000/graphql/stream',
headers: () => {
const session = getSession();
if (!session) return {};
return {
Authorization: `Bearer ${session.token}`,
};
},
});
With TypedDocumentNode
import {
createClient,
Client,
ClientOptions,
ExecutionResult,
Sink,
} from 'graphql-sse';
import { TypedDocumentNode } from '@graphql-typed-document-node/core'; // yarn add @graphql-typed-document-node/core
import { print } from 'graphql';
export interface TypedClient
extends Omit<
Client,
'iterate' | 'subscribe' // we're replacing the `iterate` and `subscribe` implementations
> {
iterate<
Result extends ExecutionResult,
Variables extends Record<string, unknown>,
>(
document: TypedDocumentNode<Result, Variables>,
variables: Variables,
): AsyncIterableIterator<Result>;
subscribe<
Result extends ExecutionResult,
Variables extends Record<string, unknown>,
>(
document: TypedDocumentNode<Result, Variables>,
variables: Variables,
sink: Sink<Result>,
): () => void;
}
export function createTypedClient(options: ClientOptions): TypedClient {
const client = createClient(options);
return {
...client,
iterate: (document, variables) =>
client.iterate({
query: print(document),
variables,
}) as any,
subscribe: (document, variables, sink) =>
client.subscribe(
{
query: print(document),
variables,
},
sink,
),
};
}
For HTTP/1 (aka. single connection mode)
import { createClient } from 'graphql-sse';
export const client = createClient({
singleConnection: true, // this is literally it 😄
url: 'http://use.single:4000/connection/graphql/stream',
// lazy: true (default) -> connect on first subscribe and disconnect on last unsubscribe
// lazy: false -> connect as soon as the client is created
});
// The client will now run in a "single connection mode" mode. Meaning,
// a single SSE connection will be used to transmit all operation results
// while separate HTTP requests will be issued to dictate the behaviour.
With Custom Retry Timeout Strategy
import { createClient } from 'graphql-sse';
import { waitForHealthy } from './my-servers';
const url = 'http://i.want.retry:4000/control/graphql/stream';
export const client = createClient({
url,
retryWait: async function waitForServerHealthyBeforeRetry() {
// if you have a server healthcheck, you can wait for it to become
// healthy before retrying after an abrupt disconnect (most commonly a restart)
await waitForHealthy(url);
// after the server becomes ready, wait for a second + random 1-4s timeout
// (avoid DDoSing yourself) and try connecting again
await new Promise((resolve) =>
setTimeout(resolve, 1000 + Math.random() * 3000),
);
},
});
With Logging of Incoming Messages
⚠️
Browsers don't show them in the DevTools. Read more.
import { createClient } from 'graphql-sse';
export const client = createClient({
url: 'http://let-me-see.messages:4000/graphql/stream',
onMessage: console.log,
});
In Browser
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>GraphQL over Server-Sent Events</title>
<script
type="text/javascript"
src="https://unpkg.com/graphql-sse/umd/graphql-sse.min.js"
></script>
</head>
<body>
<script type="text/javascript">
const client = graphqlSse.createClient({
url: 'http://umdfor.the:4000/win/graphql/stream',
});
// consider other recipes for usage inspiration
</script>
</body>
</html>
In Node.js
const ws = require('ws'); // yarn add ws
const fetch = require('node-fetch'); // yarn add node-fetch
const { AbortController } = require('node-abort-controller'); // (node < v15) yarn add node-abort-controller
const Crypto = require('crypto');
const { createClient } = require('graphql-sse');
export const client = createClient({
url: 'http://no.browser:4000/graphql/stream',
fetchFn: fetch,
abortControllerImpl: AbortController, // node < v15
/**
* Generates a v4 UUID to be used as the ID.
* Reference: https://gist.github.com/jed/982883
*/
generateID: () =>
([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, (c) =>
(c ^ (Crypto.randomBytes(1)[0] & (15 >> (c / 4)))).toString(16),
),
});
Server Handler Usage
With Custom Authentication
import { createHandler } from 'graphql-sse';
import {
schema,
getOrCreateTokenFromCookies,
customAuthenticationTokenDiscovery,
processAuthorizationHeader,
} from './my-graphql';
export const handler = createHandler({
schema,
authenticate: async (req) => {
let token = req.headers.get('x-graphql-event-stream-token');
if (token) {
// When the client is working in a "single connection mode"
// all subsequent requests for operations will have the
// stream token set in the `X-GraphQL-Event-Stream-Token` header.
//
// It is considered safe to accept the header token always
// because if a stream reservation does not exist, or is already
// fulfilled, the handler itself will reject the request.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
return Array.isArray(token) ? token.join('') : token;
}
// It is necessary to generate a unique token when dealing with
// clients that operate in the "single connection mode". The process
// of generating the token is completely up to the implementor.
token = getOrCreateTokenFromCookies(req);
// or
token = processAuthorizationHeader(req.headers.get('authorization'));
// or
token = await customAuthenticationTokenDiscovery(req);
// Using the response argument the implementor may respond to
// authentication issues however he sees fit.
if (!token) {
return [null, { status: 401, statusText: 'Unauthorized' }];
}
// Clients that operate in "distinct connections mode" dont
// need a unique stream token. It is completely ok to simply
// return an empty string for authenticated clients.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode
if (
req.method === 'POST' &&
req.headers.get('accept') === 'text/event-stream'
) {
// "distinct connections mode" requests an event-stream with a POST
// method. These two checks, together with the lack of `X-GraphQL-Event-Stream-Token`
// header, are sufficient for accurate detection.
return ''; // return token; is OK too
}
// On the other hand, clients operating in "single connection mode"
// need a unique stream token which will be provided alongside the
// incoming event stream request inside the `X-GraphQL-Event-Stream-Token` header.
//
// Read more: https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode
return token;
},
});
With Dynamic Schema
import { createHandler } from 'graphql-sse';
import { schema, checkIsAdmin, getDebugSchema } from './my-graphql';
export const handler = createHandler({
schema: async (req, executionArgsWithoutSchema) => {
// will be called on every subscribe request
// allowing you to dynamically supply the schema
// using the depending on the provided arguments
const isAdmin = await checkIsAdmin(req);
if (isAdmin) return getDebugSchema(req, executionArgsWithoutSchema);
return schema;
},
});
With Custom Context Value
import { createHandler } from 'graphql-sse';
import { schema, getDynamicContext } from './my-graphql';
export const handler = createHandler({
schema,
// or static context by supplying the value direcly
context: (req, args) => {
return getDynamicContext(req, args);
},
});
With Custom Execution Arguments
import { parse } from 'graphql';
import { createHandler } from 'graphql-sse';
import { getSchema, myValidationRules } from './my-graphql';
export const handler = createHandler({
onSubscribe: async (req, params) => {
const schema = await getSchema(req);
return {
schema,
operationName: params.operationName,
document:
typeof params.query === 'string' ? parse(params.query) : params.query,
variableValues: params.variables,
contextValue: undefined,
};
},
});
Server Handler and Client Usage
With Persisted Queries
🛸 server
import { parse, ExecutionArgs } from 'graphql';
import { createHandler } from 'graphql-sse';
import { schema } from './my-graphql';
// a unique GraphQL execution ID used for representing
// a query in the persisted queries store. when subscribing
// you should use the `SubscriptionPayload.query` to transmit the id
type QueryID = string;
const queriesStore: Record<QueryID, ExecutionArgs> = {
iWantTheGreetings: {
schema, // you may even provide different schemas in the queries store
document: parse('subscription Greetings { greetings }'),
},
};
export const handler = createHandler({
onSubscribe: (_req, params) => {
const persistedQuery =
queriesStore[String(params.extensions?.persistedQuery)];
if (persistedQuery) {
return {
...persistedQuery,
variableValues: params.variables, // use the variables from the client
contextValue: undefined,
};
}
// for extra security only allow the queries from the store
return [null, { status: 404, statusText: 'Not Found' }];
},
});
📺 client
import { createClient } from 'graphql-sse';
const client = createClient({
url: 'http://persisted.graphql:4000/queries',
});
(async () => {
const onNext = () => {
/**/
};
await new Promise((resolve, reject) => {
client.subscribe(
{
query: '', // query field is required, but you can leave it empty for persisted queries
extensions: {
persistedQuery: 'iWantTheGreetings',
},
},
{
next: onNext,
error: reject,
complete: resolve,
},
);
});
expect(onNext).toBeCalledTimes(5); // greetings in 5 languages
})();