• v2
  • Features
  • Subscriptions

Subscriptions

A GraphQL subscription initiates an event stream, where each event in the stream is pushed from the server to the client that issued the subscription.

Example use cases for subscriptions are applications that require real-time data such as chat applications or stock trading platforms.

GraphQL Yoga uses server-sent-events for the subscription protocol. You don't need any extra packages to use subscriptions.

⚠️

Compared to implementing query and mutation resolvers, subscriptions are more complex to implement as they require additional infrastructure for scenarios where you have more than one instance of your GraphQL server running as the event streams must be distributed across all servers.

Quick start with Server-Sent Events

Subscriptions can be added by extending your GraphQL schema with a Subscription type.

server.ts
import { createServer } from '@graphql-yoga/node'
 
// Provide your schema
const server = createServer({
  schema: {
    typeDefs: /* GraphQL */ `
      type Query {
        hello: String
      }
 
      type Subscription {
        countdown(from: Int!): Int!
      }
    `,
    resolvers: {
      Query: {
        hello: () => 'world',
      },
      Subscription: {
        countdown: {
          // This will return the value on every 1 sec until it reaches 0
          subscribe: async function* (_, { from }) {
            for (let i = from; i >= 0; i--) {
              await new Promise((resolve) => setTimeout(resolve, 1000))
              yield { countdown: i }
            }
          },
        },
      },
    },
  },
})
 
server.start()

Start the server, visit http://localhost:4000/graphql and paste the following operation into the left panel.

subscription {
  countdown(from: 5)
}

Then press the Play (Execute Query) button.

Alternatively, you can also send the subscription operation via curl.

curl -N -H "accept:text/event-stream" http://localhost:4000/graphql?query=subscription%20%7B%0A%20%20countdown%28from%3A%205%29%0A%7D
 
data: {"data":{"countdown":5}}
 
data: {"data":{"countdown":4}}
 
data: {"data":{"countdown":3}}
 
data: {"data":{"countdown":2}}
 
data: {"data":{"countdown":1}}
 
data: {"data":{"countdown":0}}
💡

Watch Episode #52 of graphql.wtf for a quick introduction to using GraphQL Subscriptions with Server Sent Events:

GraphQL Subscriptions with Server-Sent Events

Handling Subscriptions on the Client

WHATWG standard EventSource can be used without any extra packages to handle SSE.

eventsource.ts
const url = new URL('http://localhost:4000/graphql')
 
url.searchParams.append(
  'query',
  /* GraphQL */ `
    subscription Countdown($from: Int!) {
      countdown(from: $from)
    }
  `,
)
url.searchParams.append('variables', JSON.stringify({ from: 10 }))
 
const eventsource = new EventSource(url.href, {
  withCredentials: true, // This is required for cookies
})
 
eventsource.onmessage = (event) => {
  const data = JSON.parse(event.data)
  console.log(data) // This will result something like `{ "data": { "countdown": 0 } }`
}

Client Usage with Apollo

We can create an SSELink to use with Apollo Client. To send non Subscription operations via the default HTTP protocol, you can use split from the Apollo Link package. For more information please refer to the Apollo Link documentation.

import {
  ApolloLink,
  Operation,
  FetchResult,
  Observable,
  ApolloClient,
  InMemoryCache,
  gql,
} from '@apollo/client/core'
import { split } from '@apollo/client/link/core'
import { HttpLink } from '@apollo/client/link/http'
import { print, getOperationAST } from 'graphql'
 
type SSELinkOptions = EventSourceInit & { uri: string }
 
class SSELink extends ApolloLink {
  constructor(private options: SSELinkOptions) {
    super()
  }
 
  request(operation: Operation): Observable<FetchResult> {
    const url = new URL(this.options.uri)
    url.searchParams.append('query', print(operation.query))
    if (operation.operationName) {
      url.searchParams.append(
        'operationName',
        JSON.stringify(operation.operationName),
      )
    }
    if (operation.variables) {
      url.searchParams.append('variables', JSON.stringify(operation.variables))
    }
    if (operation.extensions) {
      url.searchParams.append(
        'extensions',
        JSON.stringify(operation.extensions),
      )
    }
 
    return new Observable((sink) => {
      const eventsource = new EventSource(url.toString(), this.options)
      eventsource.onmessage = function (event) {
        const data = JSON.parse(event.data)
        sink.next(data)
        if (eventsource.readyState === 2) {
          sink.complete()
        }
      }
      eventsource.onerror = function (error) {
        sink.error(error)
      }
      return () => eventsource.close()
    })
  }
}
 
const uri = 'http://localhost:4000/graphql'
const sseLink = new SSELink({ uri })
const httpLink = new HttpLink({ uri })
 
const link = split(
  ({ query, operationName }) => {
    const definition = getOperationAST(query, operationName)
 
    return (
      definition?.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    )
  },
  sseLink,
  httpLink,
)

Client Usage with urql

Here is an example with urql's subscriptionExchange:

urql-sse.ts
import { createClient, defaultExchanges, subscriptionExchange } from 'urql'
 
const client = createClient({
  url: 'http://localhost:4000/graphql',
  exchanges: [
    ...defaultExchanges,
    subscriptionExchange({
      forwardSubscription(operation) {
        const url = new URL('http://localhost:4000/graphql')
        url.searchParams.append('query', operation.query)
        if (operation.variables) {
          url.searchParams.append(
            'variables',
            JSON.stringify(operation.variables),
          )
        }
        return {
          subscribe(sink) {
            const eventsource = new EventSource(url.toString(), {
              withCredentials: true, // This is required for cookies
            })
            eventsource.onmessage = (event) => {
              const data = JSON.parse(event.data)
              sink.next(data)
              if (eventsource.readyState === 2) {
                sink.complete()
              }
            }
            eventsource.onerror = (error) => {
              sink.error(error)
            }
            return {
              unsubscribe: () => eventsource.close(),
            }
          },
        }
      },
    }),
  ],
})

Subscriptions over WebSockets with graphql-ws

Suppose you want to use the graphql-transport-ws protocol with GraphQL Yoga, you can use the graphql-ws library. To have the same execution pipeline in graphql-ws, we can use the Envelop instance from GraphQL Yoga like below in Node.JS. Also, you can set subscriptionsProtocol in GraphiQL options to use WebSockets instead of Server-Sent Events within GraphiQL.

yoga-with-ws.ts
import { createServer } from '@graphql-yoga/node'
import { WebSocketServer } from 'ws'
import { useServer } from 'graphql-ws/lib/use/ws'
 
async function main() {
  const yogaApp = createServer({
    graphiql: {
      // Use WebSockets in GraphiQL
      subscriptionsProtocol: 'WS',
    },
  })
 
  // Get NodeJS Server from Yoga
  const httpServer = await yogaApp.start()
  // Create WebSocket server instance from our Node server
  const wsServer = new WebSocketServer({
    server: httpServer,
    path: yogaApp.getAddressInfo().endpoint,
  })
 
  // Integrate Yoga's Envelop instance and NodeJS server with graphql-ws
  useServer(
    {
      execute: (args: any) => args.rootValue.execute(args),
      subscribe: (args: any) => args.rootValue.subscribe(args),
      onSubscribe: async (ctx, msg) => {
        const { schema, execute, subscribe, contextFactory, parse, validate } =
          yogaApp.getEnveloped(ctx)
 
        const args = {
          schema,
          operationName: msg.payload.operationName,
          document: parse(msg.payload.query),
          variableValues: msg.payload.variables,
          contextValue: await contextFactory(),
          rootValue: {
            execute,
            subscribe,
          },
        }
 
        const errors = validate(args.schema, args.document)
        if (errors.length) return errors
        return args
      },
    },
    wsServer,
  )
}
 
main().catch((e) => {
  console.error(e)
  process.exit(1)
})
💡

Check out our working example for this integration

Client integration

Please refer to the graphql-ws client recipes.

SSE vs WebSocket

Advantages of SSE over Websockets

  • Transported over simple HTTP instead of a custom protocol
  • Built-in support for re-connection and event-id Simpler protocol
  • No trouble with corporate firewalls doing packet inspection

Advantages of Websockets over SSE

  • Real-time, two-directional communication
  • Lower latency

SSE gotchas

PubSub

Getting Started

GraphQL Yoga comes with a built-in PubSub (publish/subscribe) bus. This makes it easy to send new events to the client from within your mutation resolvers.

server.ts
import { createServer, createPubSub } from '@graphql-yoga/node'
 
const pubSub = createPubSub()
 
// Provide your schema
const server = createServer({
  schema: {
    typeDefs: /* GraphQL */ `
      type Query {
        hello: String
      }
 
      type Subscription {
        randomNumber: Float!
      }
 
      type Mutation {
        broadcastRandomNumber: Boolean
      }
    `,
    resolvers: {
      Query: {
        hello: () => 'world',
      },
      Subscription: {
        randomNumber: {
          // subscribe to the randomNumber event
          subscribe: () => pubSub.subscribe('randomNumber'),
          resolve: (payload) => payload,
        },
      },
      Mutation: {
        broadcastRandomNumber(_, args) {
          // publish a random number
          pubSub.publish('randomNumber', Math.random())
        },
      },
    },
  },
})
 
server.start()

Topics

When using TypeScript it is possible to make the event emitter type-safe by providing a channel configuration via a generic.

const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()
 
pubsub.subscribe('randomNumber')
 
// This is now type-safe.
pubSub.publish('randomNumber', 1)
 
// This causes a TypeScript error.
pubSub.publish('randomNumber')
 
// This causes a TypeScript error.
pubSub.publish('event does not exist')

You can subscribe to a specific topic using pubSub.subscribe.

const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()
 
// Usage outside a GraphQL subscribe function
async function subscribe() {
  const eventSource = pubSub.subscribe('randomNumber')
 
  for await (const value of eventSource) {
    console.log(value)
    // dispose subscription after the first event has been published.
    eventSource.return()
  }
}
 
subscribe()
 
pubSub.publish('randomNumber', 3)

You can publish a value using pubSub.publish.

const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()
 
pubSub.publish('randomNumber', 3)

Topic configuration variants

You can declare events with and without a payload.

const pubSub = createPubSub<{
  // event has no payload
  'event:without:payload': []
  // event has payload of type number
  'event:payload:number': [payload: number]
  // event has payload of type { foo: number }
  'event:payload:obj': [payload: { foo: number }]
}>()
 
pubSub.publish('event:without:payload')
pubSub.publish('event:payload:number', 12)
pubSub.publish('event:payload:obj', { foo: 1 })

Topic with Dynamic ID

Sometimes you only want to emit and listen for events for a specific entity (e.g. user or product). You can declare topics scoped to a special identifier.

const pubSub = createPubSub<{
  'user:followerCount': [userId: string, payload: { followerCount: number }]
}>()
 
const userId1 = '420'
const userId2 = '69'
 
// the userId argument is enforced by the TypeScript compiler.
pubSub.subscribe('user:followerCount', userId1)
pubSub.subscribe('user:followerCount', userId2)
 
pubSub.publish('user:followerCount', userId1, { followerCount: 30 })
pubSub.publish('user:followerCount', userId2, { followerCount: 12 })

Distributed Pub/Sub for Production

If you spin up multiple instances of your GraphQL server each server instance will have its in-memory pub/sub instance. An event triggered on one server instance will not be distributed to the other server instances, resulting in subscribers on the other server not receiving any updates.

The createPubSub function allows you to specify a custom EventTarget implementation, which can use an external datastore for distributing the events across all server replicas such as Redis Pub/Sub or Kafka.

The minimal EventTarget implementation is described by the TypedEventTarget interface.

Yoga comes with an EventTarget implementation for Redis Pub/Sub.

yarn add @graphql-yoga/redis-event-target ioredis
import Redis from 'ioredis'
import { createPubSub } from '@graphql-yoga/common'
import { createRedisEventTarget } from '@graphql-yoga/redis-event-target'
 
const publishClient = new Redis()
const subscribeClient = new Redis()
 
const eventTarget = createRedisEventTarget({
  publishClient,
  subscribeClient,
})
 
const pubSub = createPubSub({ eventTarget })
💡

Please note that Redis Pub/Sub requires a stable long-running connection and thus is not a suitable solution for serverless or edge function environments.

💡

Please also note that the event payloads must be JSON serializable. If you want to send complex data structures over the wire you can use tools such as superjson.

Advanced

Filter and Map Values

Sometimes it is useful to filter or map events for an individual subscription set up by a client based on subscription field arguments. Yoga has a few utility functions that make this as simple as possible.

Example PubSub Event Stream Transform

import {
  createServer,
  createPubSub,
  pipe,
  map,
  filter,
} from '@graphql-yoga/node'
 
const pubSub = createPubSub<{
  randomNumber: [randomNumber: number]
}>()
 
const source = pipe(
  pubSub.subscribe('randomNumber'),
  map((publishedNumber) => publishedNumber * 2),
  filter((multipliedNumber) => multipliedNumber < 10),
)
 
;(async () => {
  for await (const value of source) {
    console.log(value)
  }
})()
 
pubSub.publish('randomNumber', 1) // logs 2
pubSub.publish('randomNumber', 2) // logs 4
pubSub.publish('randomNumber', 5) // filtered out
pubSub.publish('randomNumber', 3) // logs 6
source.return()

Example Random Number Stream Transform

import { createServer, createPubSub } from '@graphql-yoga/node'
 
const pubSub = createPubSub()
 
// Provide your schema
const server = createServer({
  typeDefs: /* GraphQL */ `
    type Query {
      hello: String
    }
 
    type Subscription {
      randomNumber(multiplyBy: Int!, lessThan: Float!): Float!
    }
 
    type Mutation {
      broadcastRandomNumber: Boolean
    }
  `,
  resolvers: {
    Query: {
      hello: () => 'world',
    },
    Subscription: {
      randomNumber: {
        // subscribe to the randomNumber event
        subscribe: (_, args) =>
          pipe(
            pubSub.subscribe('randomNumber'),
            map((publishedNumber) => publishedNumber * args.multiplyBy),
            filter((multipliedNumber) => multipliedNumber < args.lessThan),
          ),
        resolve: (payload) => payload,
      },
    },
    Mutation: {
      broadcastRandomNumber(_, args) {
        // publish a random number
        pubSub.publish('randomNumber', Math.random())
      },
    },
  },
})
 
server.start()

Subscriptions with Initial Value

GraphQL's subscriptions are primarily designed to send a stream of events to the client. Sometimes it is useful to send an initial value to a client as soon as the GraphQL subscription is set up.

An example of this would be a Subscription.globalCounter field that syncs a counter with all clients by streaming the initial counter value to a client that sets up the subscription and then, furthermore, streams the updated counter value to the clients every time it changes.

GraphQL's subscriptions are implemented using Async Iteration, which in itself is a very complex topic with a lot of pitfalls that can cause memory leaks if not treated with caution.

Yoga uses and re-exports Repeater.js ("The missing constructor for creating safe async iterators") for providing a friendly developer experience.

import {
  createServer,
  createPubSub,
  Repeater,
  pipe,
  map,
} from '@graphql-yoga/node'
 
let globalCounter = 0
const pubSub = createPubSub()
 
// Provide your schema
const server = createServer({
  typeDefs: /* GraphQL */ `
    type Query {
      hello: String
    }
 
    type Subscription {
      globalCounter: Int!
    }
 
    type Mutation {
      incrementGlobalCounter: Int!
    }
  `,
  resolvers: {
    Query: {
      hello: () => 'world',
    },
    Subscription: {
      globalCounter: {
        // Merge initial value with source stream of new values
        subscribe: () =>
          pipe(
            Repeater.merge([
              // cause an initial event so the
              // globalCounter is streamed to the client
              // upon initiating the subscription
              undefined,
              // event stream for future updates
              pubSub.subscribe('globalCounter:change'),
            ]),
            // map all stream values to the latest globalCounter
            map(() => globalCounter),
          ),
        resolve: (payload) => payload,
      },
    },
    Mutation: {
      incrementGlobalCounter() {
        globalCounter = globalCounter + 1
        // publish a global counter increment event
        pubSub.publish('globalCounter:change')
        return globalCounter
      },
    },
  },
})
 
server.start()

Listen to Multiple Pub/Sub Topics

Sometimes it is handy to subscribe to multiple PubSub topics instead of a single one.

import { createServer, createPubSub, Repeater } from '@graphql-yoga/node'
 
type User = {
  id: string
  login: string
}
 
let user: User | null = {
  id: '1',
  login: 'Laurin',
}
 
const pubSub = createPubSub<{
  userLoginChanged: []
  userDeleted: []
}>()
 
// Provide your schema
const server = createServer({
  typeDefs: /* GraphQL */ `
    type Query {
      hello: String
    }
 
    type User {
      id: ID!
      login: String
    }
 
    type Subscription {
      user: User
    }
 
    type Mutation {
      deleteUser: Boolean
      updateUserLogin(newLogin: String!): Boolean
    }
  `,
  resolvers: {
    Query: {
      hello: () => 'world',
    },
    Subscription: {
      user: {
        // Merge initial value with source streams of new values
        subscribe: () =>
          pipe(
            Repeater.merge([
              undefined,
              pubSub.subscribe('userLoginChanged'),
              pubSub.subscribe('userDeleted'),
            ]),
            // map all stream values to the latest user
            map(() => user),
          ),
        resolve: (payload) => payload,
      },
    },
    Mutation: {
      deleteUser() {
        user = null
        pubSub.publish('userDeleted')
        return true
      },
      updateUserLogin(_, args) {
        if (!user) {
          return false
        }
        user.login = args.newLogin
        pubSub.publish('userLoginChanged')
        return true
      },
    },
  },
})
 
server.start()
Last updated on September 19, 2022