From 52ee736bd2ac407952863c8e8397770bf1495a45 Mon Sep 17 00:00:00 2001 From: "netop://ウィビ" Date: Sun, 3 May 2026 12:40:04 -0700 Subject: adds support for subscriptions --- source/http.ts | 3 ++ source/utility/types.ts | 6 +++ source/ws.ts | 99 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+) create mode 100644 source/ws.ts (limited to 'source') diff --git a/source/http.ts b/source/http.ts index 7c747e1..c4a70c8 100755 --- a/source/http.ts +++ b/source/http.ts @@ -54,6 +54,9 @@ export function GraphQLHTTP< ...options }: GQLOptions): GraphQLHandler { return async (request: Req) => { + if (options.subscriptions && request.headers.get("upgrade")?.toLowerCase() === "websocket") + return options.subscriptions(request as unknown as Request); + const accept = request.headers.get("Accept") || ""; const typeList = ["application/json", "text/html", "text/plain", "*/*"] diff --git a/source/utility/types.ts b/source/utility/types.ts index 0d8b443..2311c25 100755 --- a/source/utility/types.ts +++ b/source/utility/types.ts @@ -41,6 +41,12 @@ export interface GQLOptions extend playgroundOptions?: Omit; /** The executable schema to query against. */ schema: GraphQLSchema; + /** + * WebSocket subscription handler returned by `GraphQLWS()`. When set, any + * incoming request with `Upgrade: websocket` is delegated to it before HTTP + * content negotiation runs. + */ + subscriptions?: (request: Request) => Response; } /** A single GraphQL operation — either a `query` or a `mutation`, never both. */ diff --git a/source/ws.ts b/source/ws.ts new file mode 100644 index 0000000..fdd9fb1 --- /dev/null +++ b/source/ws.ts @@ -0,0 +1,99 @@ + + + +/*** IMPORT ------------------------------------------- ***/ + +import { makeServer, type ServerOptions } from "graphql-ws"; + +/*** UTILITY ------------------------------------------ ***/ + +import type { GraphQLSchema } from "graphql"; + +/*** EXPORT ------------------------------------------- ***/ + +/** + * Configuration accepted by {@link GraphQLWS}. + * + * Wraps `graphql-ws`'s {@link ServerOptions} and requires a `schema`. Anything + * else accepted by `graphql-ws` (custom `execute`, `subscribe`, `onConnect`, + * `onSubscribe`, etc.) can be passed through. + */ +export type WSOptions = { + /** Builds the context value passed to resolvers. Runs per WS connection. */ + context?: (ctx: { connectionParams?: Record }) => Ctx | Promise; + /** The executable schema to subscribe against. */ + schema: GraphQLSchema; +} & Partial>; + +/** + * Builds a WebSocket subscription handler that speaks the + * `graphql-transport-ws` protocol via `graphql-ws`. + * + * The returned function expects a Fetch `Request` with an `Upgrade: websocket` + * header. It performs the upgrade with `Deno.upgradeWebSocket` and hands the + * socket to `graphql-ws`. Pass the result as `subscriptions` to {@link GraphQLHTTP} + * so HTTP and WS share one endpoint. + * + * @example + * ```ts + * import { executeSchema, GraphQLHTTP, GraphQLWS, PubSub, gql } from "@eol/gq"; + * + * const pubsub = new PubSub(); + * + * const schema = executeSchema({ + * resolvers: { + * Subscription: { + * pinged: { subscribe: () => pubsub.asyncIterator(["PING"]) } + * } + * }, + * typeDefs: gql`type Subscription { pinged: String }` + * }); + * + * const subscriptions = GraphQLWS({ schema }); + * Deno.serve(GraphQLHTTP({ schema, subscriptions })); + * ``` + */ +export function GraphQLWS(options: WSOptions) { + const server = makeServer({ + ...options, + context: options.context as ServerOptions["context"], + schema: options.schema + }); + + return (request: Request): Response => { + const { response, socket } = Deno.upgradeWebSocket(request, { + protocol: "graphql-transport-ws" + }); + + socket.onopen = () => { + const closed = server.opened( + { + close: (code, reason) => socket.close(code, reason), + onMessage: (cb) => { + socket.onmessage = async (event) => { + try { + await cb( + typeof event.data === "string" ? + event.data : + await (event.data as Blob).text() + ); + } catch (err) { + socket.close(1011, (err as Error).message); + } + }; + }, + protocol: socket.protocol, + send: (data) => Promise.resolve(socket.send(data)) + }, + { + request, + socket + } + ); + + socket.onclose = (event) => closed(event.code, event.reason); + }; + + return response; + }; +} -- cgit v1.2.3