diff --git a/src/async/asyncErrors.ts b/src/async/asyncErrors.ts index 1aaec2b..08a66c4 100644 --- a/src/async/asyncErrors.ts +++ b/src/async/asyncErrors.ts @@ -42,3 +42,10 @@ export class TsonStreamInterruptedError extends TsonError { this.name = "TsonStreamInterruptedError"; } } + +export class TsonAbortError extends TsonError { + constructor(cause: unknown) { + super("Aborted", { cause }); + this.name = "TsonAbortError"; + } +} diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index e450c8d..0b79aaf 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -9,7 +9,7 @@ import { TsonSerialized, TsonTransformerSerializeDeserialize, } from "../sync/syncTypes.js"; -import { TsonStreamInterruptedError } from "./asyncErrors.js"; +import { TsonAbortError, TsonStreamInterruptedError } from "./asyncErrors.js"; import { TsonAsyncIndex, TsonAsyncOptions, @@ -18,7 +18,6 @@ import { } from "./asyncTypes.js"; import { createReadableStream, - mapIterable, readableStreamToAsyncIterable, } from "./iterableUtils.js"; import { TsonAsyncValueTuple } from "./serializeAsync.js"; @@ -42,9 +41,8 @@ type TsonParseAsync = ( opts?: TsonParseAsyncOptions, ) => Promise; -type TsonDeserializeIterable = AsyncIterable< - TsonAsyncValueTuple | TsonSerialized ->; +type TsonDeserializeIterableValue = TsonAsyncValueTuple | TsonSerialized; +type TsonDeserializeIterable = AsyncIterable; function createTsonDeserializer(opts: TsonAsyncOptions) { const typeByKey: Record = {}; @@ -116,7 +114,9 @@ function createTsonDeserializer(opts: TsonAsyncOptions) { break; } - const [index, result] = nextValue.value as TsonAsyncValueTuple; + const { value } = nextValue; + + const [index, result] = value as TsonAsyncValueTuple; const controller = cache.get(index); @@ -269,32 +269,27 @@ export function createTsonParseEventSource(opts: TsonAsyncOptions) { signal?: AbortSignal; } = {}, ) => { - const [stream, controller] = createReadableStream(); + const [stream, controller] = + createReadableStream(); const eventSource = new EventSource(url); + const { signal } = parseOpts; const onAbort = () => { + assert(signal); eventSource.close(); - controller.close(); - parseOpts.signal?.removeEventListener("abort", onAbort); + controller.error(new TsonAbortError("Stream aborted by user")); + + signal.removeEventListener("abort", onAbort); }; - parseOpts.signal?.addEventListener("abort", onAbort); + signal?.addEventListener("abort", onAbort); eventSource.onmessage = (msg) => { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - controller.enqueue(msg.data); + controller.enqueue(JSON.parse(msg.data)); }; - const iterable = mapIterable( - readableStreamToAsyncIterable(stream), - (msg) => { - const parsed = JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized; - - console.log({ parsed }); - return parsed; - }, - ); - + const iterable = readableStreamToAsyncIterable(stream); return (await instance(iterable, parseOpts)) as TValue; }; } diff --git a/src/async/handlers/tsonAsyncIterable.ts b/src/async/handlers/tsonAsyncIterable.ts index 8a99cfc..d6e5657 100644 --- a/src/async/handlers/tsonAsyncIterable.ts +++ b/src/async/handlers/tsonAsyncIterable.ts @@ -1,4 +1,5 @@ import { + TsonAbortError, TsonPromiseRejectionError, TsonStreamInterruptedError, } from "../asyncErrors.js"; @@ -31,6 +32,11 @@ export const tsonAsyncIterable: TsonAsyncType< while (((next = await opts.reader.read()), !next.done)) { const { value } = next; if (value instanceof TsonStreamInterruptedError) { + if (value.cause instanceof TsonAbortError) { + opts.close(); + return; + } + throw value; } diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index 51416f0..cd7622e 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -99,8 +99,14 @@ test("SSE response test", async () => { } } - expect(messages).toMatchInlineSnapshot(); + expect(messages).toMatchInlineSnapshot(` + [ + 0, + 1, + 2, + 3, + 4, + ] + `); } }); - -test.todo("parse SSE response");