diff --git a/src/async/createTsonAsync.ts b/src/async/createTsonAsync.ts index 7009cca..21c1189 100644 --- a/src/async/createTsonAsync.ts +++ b/src/async/createTsonAsync.ts @@ -1,5 +1,8 @@ import { TsonAsyncOptions } from "./asyncTypes.js"; -import { createTsonParseAsync } from "./deserializeAsync.js"; +import { + createTsonParseAsync, + createTsonParseEventSource, +} from "./deserializeAsync.js"; import { createTsonSSEResponse, createTsonStreamAsync, @@ -10,6 +13,7 @@ import { * @internal */ export const createTsonAsync = (opts: TsonAsyncOptions) => ({ + createEventSourceParser: createTsonParseEventSource(opts), parseJsonStream: createTsonParseAsync(opts), stringifyJsonStream: createTsonStreamAsync(opts), toSSEResponse: createTsonSSEResponse(opts), diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 08627db..e450c8d 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -16,7 +16,11 @@ import { TsonAsyncStringifierIterable, TsonAsyncType, } from "./asyncTypes.js"; -import { createReadableStream } from "./iterableUtils.js"; +import { + createReadableStream, + mapIterable, + readableStreamToAsyncIterable, +} from "./iterableUtils.js"; import { TsonAsyncValueTuple } from "./serializeAsync.js"; type WalkFn = (value: unknown) => unknown; @@ -256,38 +260,41 @@ export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync { }) as TsonParseAsync; } -// export function createTsonParseEventSource(opts: TsonAsyncOptions) { -// const instance = createTsonDeserializer(opts); - -// return async ( -// url: string, -// parseOpts?: TsonParseAsyncOptions & { -// abortSignal: AbortSignal; -// }, -// ) => { -// const [stream, controller] = createReadableStream(); -// const eventSource = new EventSource(url); - -// const onAbort = () => { -// eventSource.close(); -// controller.close(); -// parseOpts?.abortSignal.removeEventListener("abort", onAbort); -// }; - -// parseOpts?.abortSignal.addEventListener("abort", onAbort); - -// eventSource.onmessage = (msg) => { -// // eslint-disable-next-line @typescript-eslint/no-unsafe-argument -// controller.enqueue(msg.data); -// }; - -// const iterable = mapIterable( -// readableStreamToAsyncIterable(stream), -// (msg) => { -// return JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized; -// }, -// ); - -// return (await instance(iterable, parseOpts ?? {})) as TValue; -// }; -// } +export function createTsonParseEventSource(opts: TsonAsyncOptions) { + const instance = createTsonDeserializer(opts); + + return async ( + url: string, + parseOpts: TsonParseAsyncOptions & { + signal?: AbortSignal; + } = {}, + ) => { + const [stream, controller] = createReadableStream(); + const eventSource = new EventSource(url); + + const onAbort = () => { + eventSource.close(); + controller.close(); + parseOpts.signal?.removeEventListener("abort", onAbort); + }; + + parseOpts.signal?.addEventListener("abort", onAbort); + + eventSource.onmessage = (msg) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + controller.enqueue(msg.data); + }; + + const iterable = mapIterable( + readableStreamToAsyncIterable(stream), + (msg) => { + const parsed = JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized; + + console.log({ parsed }); + return parsed; + }, + ); + + return (await instance(iterable, parseOpts)) as TValue; + }; +} diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index b21336f..51416f0 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -25,7 +25,7 @@ test("SSE response test", async () => { }; } - // type MockObj = ReturnType; + type MockObj = ReturnType; // ------------- server ------------------- const opts = { @@ -53,26 +53,26 @@ test("SSE response test", async () => { }); // ------------- client ------------------- - // const tson = createTsonAsync(opts); - - // do a streamed fetch request - const sse = new EventSource(server.url); - - const messages: MessageEvent["data"][] = []; - await new Promise((resolve) => { - sse.onmessage = (msg) => { - // console.log(sse.readyState); - // console.log({ msg }); - messages.push(msg.data); - - if (messages.length === 5) { - sse.close(); - resolve(); - } - }; - }); - - expect(messages).toMatchInlineSnapshot(` + const tson = createTsonAsync(opts); + + { + const sse = new EventSource(server.url); + + const messages: MessageEvent["data"][] = []; + await new Promise((resolve) => { + sse.onmessage = (msg) => { + // console.log(sse.readyState); + // console.log({ msg }); + messages.push(msg.data); + + if (messages.length === 5) { + sse.close(); + resolve(); + } + }; + }); + + expect(messages).toMatchInlineSnapshot(` [ "{\\"json\\":{\\"foo\\":\\"bar\\",\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"],\\"promise\\":[\\"Promise\\",1,\\"__tson\\"],\\"rejectedPromise\\":[\\"Promise\\",2,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}", "[0,[0,0]]", @@ -81,6 +81,26 @@ test("SSE response test", async () => { "[0,[0,1]]", ] `); + } + + { + // e2e + const ac = new AbortController(); + const shape = await tson.createEventSourceParser(server.url, { + signal: ac.signal, + }); + + const messages: number[] = []; + + for await (const value of shape.iterable) { + messages.push(value); + if (messages.length === 5) { + ac.abort(); + } + } + + expect(messages).toMatchInlineSnapshot(); + } }); test.todo("parse SSE response");