diff options
Diffstat (limited to 'source/library/fetcher/websocket.ts')
| -rw-r--r-- | source/library/fetcher/websocket.ts | 97 |
1 files changed, 89 insertions, 8 deletions
diff --git a/source/library/fetcher/websocket.ts b/source/library/fetcher/websocket.ts index 6376e76..6c6bfe8 100644 --- a/source/library/fetcher/websocket.ts +++ b/source/library/fetcher/websocket.ts @@ -1,18 +1,99 @@ +/*** IMPORT ------------------------------------------- ***/ + +import { createClient, type Client, type ClientOptions } from "graphql-ws"; + /*** UTILITY ------------------------------------------ ***/ -import type { Fetcher, FetcherOptions } from "./types.ts"; +import type { Fetcher, FetcherOptions, FetcherResult } from "./types.ts"; /*** EXPORT ------------------------------------------- ***/ -/** - * WebSocket fetcher for graphql-ws protocol. - * Stub implementation — see PLAN.md stage v0.4 for full implementation. - */ -export function createWsFetcher(_options: FetcherOptions): Fetcher { - return () => { - throw new Error("WebSocket fetcher not yet implemented — see PLAN.md v0.4"); +export type WsFetcherOptions = Omit<FetcherOptions, "fetch"> & { + client?: Partial<Omit<ClientOptions, "url">>; +}; + +export function createWsFetcher(options: WsFetcherOptions): Fetcher { + const client: Client = createClient({ + connectionParams: options.headers, + url: options.url, + ...options.client + }); + + return (req) => { + return { + [Symbol.asyncIterator](): AsyncIterator<FetcherResult> { + const queue: FetcherResult[] = []; + const resolvers: ((step: IteratorResult<FetcherResult>) => void)[] = []; + let done = false; + let error: unknown = null; + + const dispose = client.subscribe<FetcherResult, Record<string, unknown>>( + { + extensions: req.headers as Record<string, unknown> | undefined, + operationName: req.operationName ?? undefined, + query: req.query, + variables: req.variables + }, + { + complete() { + done = true; + flush(); + }, + error(err) { + error = err; + done = true; + flush(); + }, + next(value) { + queue.push(value as FetcherResult); + flush(); + } + } + ); + + function flush() { + while (resolvers.length > 0 && (queue.length > 0 || done)) { + const resolve = resolvers.shift(); + + if (!resolve) + break; + + if (queue.length > 0) + resolve({ done: false, value: queue.shift() as FetcherResult }); + else + resolve({ done: true, value: undefined }); + } + } + + return { + next(): Promise<IteratorResult<FetcherResult>> { + if (error) { + const err = error; + error = null; + return Promise.reject(err); + } + + if (queue.length > 0) + return Promise.resolve({ done: false, value: queue.shift() as FetcherResult }); + + if (done) + return Promise.resolve({ done: true, value: undefined }); + + return new Promise<IteratorResult<FetcherResult>>((resolve) => { + resolvers.push(resolve); + }); + }, + return(): Promise<IteratorResult<FetcherResult>> { + dispose(); + done = true; + flush(); + return Promise.resolve({ done: true, value: undefined }); + } + }; + } + }; }; } |