diff options
Diffstat (limited to 'source/library/fetcher/sse.ts')
| -rw-r--r-- | source/library/fetcher/sse.ts | 100 |
1 files changed, 92 insertions, 8 deletions
diff --git a/source/library/fetcher/sse.ts b/source/library/fetcher/sse.ts index b6805dc..c28436b 100644 --- a/source/library/fetcher/sse.ts +++ b/source/library/fetcher/sse.ts @@ -1,18 +1,102 @@ +/*** IMPORT ------------------------------------------- ***/ + +import { createClient, type Client, type ClientOptions } from "graphql-sse"; + /*** UTILITY ------------------------------------------ ***/ -import type { Fetcher, FetcherOptions } from "./types.ts"; +import type { Fetcher, FetcherOptions, FetcherResult } from "./types.ts"; /*** EXPORT ------------------------------------------- ***/ -/** - * Server-Sent Events fetcher for graphql-sse protocol. - * Stub implementation — see PLAN.md stage v0.4 for full implementation. - */ -export function createSseFetcher(_options: FetcherOptions): Fetcher { - return () => { - throw new Error("SSE fetcher not yet implemented — see PLAN.md v0.4"); +export type SseFetcherOptions = FetcherOptions & { + client?: Partial<Omit<ClientOptions, "url">>; +}; + +export function createSseFetcher(options: SseFetcherOptions): Fetcher { + const client: Client = createClient({ + fetchFn: options.fetch, + headers: options.headers, + url: options.url, + ...options.client + }); + + return (req) => { + return { + [Symbol.asyncIterator](): AsyncIterator<FetcherResult> { + const queue: FetcherResult[] = []; + const resolvers: ((step: IteratorResult<FetcherResult>) => void)[] = []; + let done = false; + let error: unknown = null; + + const dispose = client.subscribe<FetcherResult, Record<string, unknown>>( + { + extensions: req.headers as Record<string, unknown> | undefined, + operationName: req.operationName ?? undefined, + query: req.query, + variables: req.variables + }, + { + complete() { + done = true; + flush(); + }, + error(err) { + error = err; + done = true; + flush(); + }, + next(value) { + queue.push(value as FetcherResult); + flush(); + } + } + ); + + function flush() { + while (resolvers.length > 0 && (queue.length > 0 || done)) { + const resolve = resolvers.shift(); + + if (!resolve) + break; + + if (queue.length > 0) + resolve({ done: false, value: queue.shift() as FetcherResult }); + else if (error) + resolve({ done: true, value: undefined }); + else + resolve({ done: true, value: undefined }); + } + } + + return { + next(): Promise<IteratorResult<FetcherResult>> { + if (error) { + const err = error; + error = null; + return Promise.reject(err); + } + + if (queue.length > 0) + return Promise.resolve({ done: false, value: queue.shift() as FetcherResult }); + + if (done) + return Promise.resolve({ done: true, value: undefined }); + + return new Promise<IteratorResult<FetcherResult>>((resolve) => { + resolvers.push(resolve); + }); + }, + return(): Promise<IteratorResult<FetcherResult>> { + dispose(); + done = true; + flush(); + return Promise.resolve({ done: true, value: undefined }); + } + }; + } + }; }; } |