aboutsummaryrefslogtreecommitdiff
path: root/source/library/fetcher/websocket.ts
diff options
context:
space:
mode:
Diffstat (limited to 'source/library/fetcher/websocket.ts')
-rw-r--r--source/library/fetcher/websocket.ts97
1 files changed, 89 insertions, 8 deletions
diff --git a/source/library/fetcher/websocket.ts b/source/library/fetcher/websocket.ts
index 6376e76..6c6bfe8 100644
--- a/source/library/fetcher/websocket.ts
+++ b/source/library/fetcher/websocket.ts
@@ -1,18 +1,99 @@
+/*** IMPORT ------------------------------------------- ***/
+
+import { createClient, type Client, type ClientOptions } from "graphql-ws";
+
/*** UTILITY ------------------------------------------ ***/
-import type { Fetcher, FetcherOptions } from "./types.ts";
+import type { Fetcher, FetcherOptions, FetcherResult } from "./types.ts";
/*** EXPORT ------------------------------------------- ***/
-/**
- * WebSocket fetcher for graphql-ws protocol.
- * Stub implementation — see PLAN.md stage v0.4 for full implementation.
- */
-export function createWsFetcher(_options: FetcherOptions): Fetcher {
- return () => {
- throw new Error("WebSocket fetcher not yet implemented — see PLAN.md v0.4");
+export type WsFetcherOptions = Omit<FetcherOptions, "fetch"> & {
+ client?: Partial<Omit<ClientOptions, "url">>;
+};
+
+export function createWsFetcher(options: WsFetcherOptions): Fetcher {
+ const client: Client = createClient({
+ connectionParams: options.headers,
+ url: options.url,
+ ...options.client
+ });
+
+ return (req) => {
+ return {
+ [Symbol.asyncIterator](): AsyncIterator<FetcherResult> {
+ const queue: FetcherResult[] = [];
+ const resolvers: ((step: IteratorResult<FetcherResult>) => void)[] = [];
+ let done = false;
+ let error: unknown = null;
+
+ const dispose = client.subscribe<FetcherResult, Record<string, unknown>>(
+ {
+ extensions: req.headers as Record<string, unknown> | undefined,
+ operationName: req.operationName ?? undefined,
+ query: req.query,
+ variables: req.variables
+ },
+ {
+ complete() {
+ done = true;
+ flush();
+ },
+ error(err) {
+ error = err;
+ done = true;
+ flush();
+ },
+ next(value) {
+ queue.push(value as FetcherResult);
+ flush();
+ }
+ }
+ );
+
+ function flush() {
+ while (resolvers.length > 0 && (queue.length > 0 || done)) {
+ const resolve = resolvers.shift();
+
+ if (!resolve)
+ break;
+
+ if (queue.length > 0)
+ resolve({ done: false, value: queue.shift() as FetcherResult });
+ else
+ resolve({ done: true, value: undefined });
+ }
+ }
+
+ return {
+ next(): Promise<IteratorResult<FetcherResult>> {
+ if (error) {
+ const err = error;
+ error = null;
+ return Promise.reject(err);
+ }
+
+ if (queue.length > 0)
+ return Promise.resolve({ done: false, value: queue.shift() as FetcherResult });
+
+ if (done)
+ return Promise.resolve({ done: true, value: undefined });
+
+ return new Promise<IteratorResult<FetcherResult>>((resolve) => {
+ resolvers.push(resolve);
+ });
+ },
+ return(): Promise<IteratorResult<FetcherResult>> {
+ dispose();
+ done = true;
+ flush();
+ return Promise.resolve({ done: true, value: undefined });
+ }
+ };
+ }
+ };
};
}