diff --git a/src/async/asyncErrors.ts b/src/async/asyncErrors.ts new file mode 100644 index 00000000..1aaec2b8 --- /dev/null +++ b/src/async/asyncErrors.ts @@ -0,0 +1,44 @@ +import { TsonError } from "../errors.js"; +import { isPlainObject } from "../internals/isPlainObject.js"; + +function getErrorMessageFromUnknown(unknown: unknown): null | string { + if (typeof unknown === "string") { + return unknown; + } + + if (unknown instanceof Error) { + return unknown.message; + } + + if (isPlainObject(unknown) && typeof unknown["message"] === "string") { + return unknown["message"]; + } + + return null; +} + +export class TsonPromiseRejectionError extends TsonError { + constructor(cause: unknown) { + // get error message from cause if possible + const message = getErrorMessageFromUnknown(cause) ?? "Promise rejected"; + super(message, { cause }); + this.name = "TsonPromiseRejectionError"; + } + + static from(cause: unknown) { + return cause instanceof Error + ? cause + : new TsonPromiseRejectionError(cause); + } +} + +export class TsonStreamInterruptedError extends TsonError { + constructor(cause: unknown) { + super( + "Stream interrupted: " + + (getErrorMessageFromUnknown(cause) ?? "unknown reason"), + { cause }, + ); + this.name = "TsonStreamInterruptedError"; + } +} diff --git a/src/async/asyncTypes.ts b/src/async/asyncTypes.ts index bc2989ee..a14f7d62 100644 --- a/src/async/asyncTypes.ts +++ b/src/async/asyncTypes.ts @@ -1,7 +1,11 @@ import { TsonError } from "../errors.js"; -import { TsonType } from "../sync/syncTypes.js"; -import { TsonBranded, TsonTypeTesterCustom } from "../sync/syncTypes.js"; -import { serialized } from "../sync/syncTypes.js"; +import { + TsonBranded, + TsonType, + TsonTypeTesterCustom, + serialized, +} from "../sync/syncTypes.js"; +import { TsonStreamInterruptedError } from "./asyncErrors.js"; export type TsonAsyncStringifierIterable = AsyncIterable & { [serialized]: TValue; @@ -33,7 +37,9 @@ export interface TsonTransformerSerializeDeserializeAsync< /** * Reader for the ReadableStream of values */ - reader: ReadableStreamDefaultReader; + reader: ReadableStreamDefaultReader< + TSerializedValue | TsonStreamInterruptedError + >; }) => TValue; /** diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 005c46f9..ec907cfd 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -1,13 +1,22 @@ -import { expect, test, vi } from "vitest"; +import { expect, test, vi, vitest } from "vitest"; import { + TsonType, createTsonAsync, + createTsonParseAsync, tsonAsyncIterator, tsonBigint, tsonPromise, } from "../index.js"; import { assert } from "../internals/assert.js"; -import { createTestServer } from "../internals/testUtils.js"; +import { + createDeferred, + createTestServer, + sleep, + waitError, + waitFor, +} from "../internals/testUtils.js"; +import { TsonSerialized } from "../sync/syncTypes.js"; import { TsonAsyncOptions } from "./asyncTypes.js"; import { mapIterable, readableStreamToAsyncIterable } from "./iterableUtils.js"; @@ -200,3 +209,321 @@ test("e2e: stringify async iterable and promise over the network", async () => { server.close(); }); + +class CustomError extends Error { + constructor(message: string) { + super(message); + this.name = "CustomError"; + } +} + +const tsonCustomError: TsonType = { + deserialize: (value) => new Error(value.message), + key: "CustomError", + serialize: (value) => ({ + message: value.message, + }), + test: (value) => value instanceof CustomError, +}; + +test("iterator error", async () => { + function createMockObj() { + const deferred = createDeferred(); + + async function* generator() { + for (let index = 0; index < 3; index++) { + yield `item: ${index}`; + + await new Promise((resolve) => setTimeout(resolve, 1)); + } + + // resolve the deferred after crash + setTimeout(() => { + deferred.resolve("deferred resolved"); + }, 10); + + throw new CustomError("server iterator error"); + } + + return { + deferred: deferred.promise, + iterable: generator(), + promise: Promise.resolve(42), + }; + } + + type MockObj = ReturnType; + + // ------------- server ------------------- + const opts: TsonAsyncOptions = { + types: [tsonPromise, tsonAsyncIterator, tsonCustomError], + }; + + const server = await createTestServer({ + handleRequest: async (_req, res) => { + const tson = createTsonAsync(opts); + + const obj = createMockObj(); + const strIterarable = tson.stringify(obj, 4); + + for await (const value of strIterarable) { + res.write(value); + } + + res.end(); + }, + }); + + // ------------- client ------------------- + const tson = createTsonAsync(opts); + + // do a streamed fetch request + const response = await fetch(server.url); + + assert(response.body); + + const textDecoder = new TextDecoder(); + const stringIterator = mapIterable( + readableStreamToAsyncIterable(response.body), + (v) => textDecoder.decode(v), + ); + + const parsed = await tson.parse(stringIterator); + expect(await parsed.promise).toEqual(42); + + const results = []; + let iteratorError: Error | null = null; + try { + for await (const value of parsed.iterable) { + results.push(value); + } + } catch (err) { + iteratorError = err as Error; + } finally { + server.close(); + } + + expect(iteratorError).toMatchInlineSnapshot("[Error: server iterator error]"); + + expect(await parsed.deferred).toEqual("deferred resolved"); + expect(await parsed.promise).toEqual(42); + expect(results).toMatchInlineSnapshot(` + [ + "item: 0", + "item: 1", + "item: 2", + ] + `); +}); + +test("values missing when stream ends", async () => { + async function* generator() { + await Promise.resolve(); + + yield "[" + "\n"; + + const obj = { + json: { + iterable: ["AsyncIterable", 1, "__tson"], + promise: ["Promise", 0, "__tson"], + }, + nonce: "__tson", + } as TsonSerialized; + yield JSON.stringify(obj) + "\n"; + + await sleep(1); + + yield " ," + "\n"; + yield " [" + "\n"; + // + // - promise is never resolved + + // - iterator is never closed + yield ` [1, [0, "value 1"]]`; + yield ` [1, [0, "value 2"]]`; + // yield ` [1, [2]]`; // iterator done + + // + // yield " ]]" + "\n"; + } + + const opts = { + onStreamError: vitest.fn(), + types: [tsonPromise, tsonAsyncIterator], + } satisfies TsonAsyncOptions; + + const parse = createTsonParseAsync(opts); + + const result = await parse<{ + iterable: AsyncIterable; + promise: Promise; + }>(generator()); + + { + // iterator should error + const results = []; + + let err: Error | null = null; + try { + for await (const value of result.iterable) { + results.push(value); + } + } catch (cause) { + err = cause as Error; + } + + assert(err); + + expect(err.message).toMatchInlineSnapshot( + '"Stream interrupted: Stream ended unexpectedly"', + ); + } + + { + // promise was never resolved and should error + const err = await waitError(result.promise); + + expect(err).toMatchInlineSnapshot( + "[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]", + ); + } + + expect(opts.onStreamError).toHaveBeenCalledTimes(1); + expect(opts.onStreamError.mock.calls).toMatchInlineSnapshot(` + [ + [ + [TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly], + ], + ] + `); +}); + +test("async: missing values of promise", async () => { + async function* generator() { + await Promise.resolve(); + + yield "[" + "\n"; + + const obj = { + json: { + foo: ["Promise", 0, "__tson"], + }, + nonce: "__tson", + } as TsonSerialized; + yield JSON.stringify(obj) + "\n"; + + await Promise.resolve(); + + yield " ," + "\n"; + yield " [" + "\n"; + // [....... values should be here .......] + // yield "]]\n"; // <-- stream and values ended symbol + } + + const onErrorSpy = vitest.fn(); + await createTsonAsync({ + onStreamError: onErrorSpy, + types: [tsonPromise], + }).parse(generator()); + + await waitFor(() => { + expect(onErrorSpy).toHaveBeenCalledTimes(1); + }); + + expect(onErrorSpy.mock.calls[0][0]).toMatchInlineSnapshot( + "[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]", + ); +}); + +test("1 iterator completed but another never finishes", async () => { + async function* generator() { + await Promise.resolve(); + + yield "[" + "\n"; + + const obj = { + json: { + iterable1: ["AsyncIterable", 1, "__tson"], + iterable2: ["AsyncIterable", 2, "__tson"], + }, + nonce: "__tson", + } as TsonSerialized; + yield JSON.stringify(obj) + "\n"; + + await sleep(1); + + yield " ," + "\n"; + yield " [" + "\n"; + // + + // iterator 2 never finishes + yield ` [2, [0, "value"]]\n`; + // yield ` [2, [2]]`; // iterator done + + // iterator 1 finishes + yield ` [1, [0, "value"]]\n`; + yield ` [1, [2]]\n`; // iterator done + + // + // yield " ]]" + "\n"; + } + + const opts = { + onStreamError: vitest.fn(), + types: [tsonPromise, tsonAsyncIterator], + } satisfies TsonAsyncOptions; + + const parse = createTsonParseAsync(opts); + + const result = await parse<{ + iterable1: AsyncIterable; + iterable2: AsyncIterable; + }>(generator()); + + { + // iterator 1 should complete + const results = []; + + for await (const value of result.iterable1) { + results.push(value); + } + + expect(results).toEqual(["value"]); + } + + { + // iterator 2 should error + const results = []; + + let err: Error | null = null; + try { + for await (const value of result.iterable2) { + results.push(value); + } + } catch (cause) { + err = cause as Error; + } + + assert(err); + + expect(results).toMatchInlineSnapshot(` + [ + "value", + ] + `); + + expect(err.message).toMatchInlineSnapshot( + '"Stream interrupted: Stream ended unexpectedly"', + ); + } + + expect(opts.onStreamError).toHaveBeenCalledTimes(1); + + expect(opts.onStreamError.mock.calls).toMatchInlineSnapshot(` + [ + [ + [TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly], + ], + ] + `); +}); diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 70093371..b3c56329 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -9,6 +9,7 @@ import { TsonSerialized, TsonTransformerSerializeDeserialize, } from "../sync/syncTypes.js"; +import { TsonStreamInterruptedError } from "./asyncErrors.js"; import { TsonAsyncIndex, TsonAsyncOptions, @@ -95,7 +96,12 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { accumulator: string, walk: WalkFn, ) { + // + let streamEnded = false; + // + function readLine(str: string) { + // console.log("got str", str); str = str.trimStart(); if (str.startsWith(",")) { @@ -103,8 +109,14 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { str = str.slice(1); } - if (str.length < 2) { - // minimum length is 2: '[]' + if (str === "" || str === "[" || str === ",") { + // beginning of values array or empty string + return; + } + + if (str === "]]") { + // end of values and stream + streamEnded = true; return; } @@ -134,7 +146,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { } } while (lines.length); - assert(!cache.size, `Stream ended with ${cache.size} pending promises`); + assert(streamEnded, "Stream ended unexpectedly"); } async function init() { @@ -182,19 +194,17 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { getStreamedValues(buffer, accumulator, walk).catch((cause) => { // Something went wrong while getting the streamed values - const err = new TsonError( - `Stream interrupted: ${(cause as Error).message}`, - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - { cause }, - ); + const err = new TsonStreamInterruptedError(cause); - // cancel all pending promises + // enqueue the error to all the streams for (const controller of cache.values()) { - controller.error(err); + try { + controller.enqueue(err); + } catch { + // ignore if the controller is closed + } } - cache.clear(); - opts.onStreamError?.(err); }); } diff --git a/src/async/handlers/tsonAsyncIterable.ts b/src/async/handlers/tsonAsyncIterable.ts index 34a2fa4b..ce58840e 100644 --- a/src/async/handlers/tsonAsyncIterable.ts +++ b/src/async/handlers/tsonAsyncIterable.ts @@ -1,4 +1,7 @@ -import { TsonPromiseRejectionError } from "../../errors.js"; +import { + TsonPromiseRejectionError, + TsonStreamInterruptedError, +} from "../asyncErrors.js"; import { TsonAsyncType } from "../asyncTypes.js"; const ITERATOR_VALUE = 0; @@ -23,9 +26,15 @@ export const tsonAsyncIterator: TsonAsyncType< async: true, deserialize: (opts) => { return (async function* generator() { - let next: ReadableStreamReadResult; + let next: Awaited>; + while (((next = await opts.reader.read()), !next.done)) { - switch (next.value[0]) { + const { value } = next; + if (value instanceof TsonStreamInterruptedError) { + throw value; + } + + switch (value[0]) { case ITERATOR_DONE: { opts.controller.close(); return; @@ -33,11 +42,11 @@ export const tsonAsyncIterator: TsonAsyncType< case ITERATOR_ERROR: { opts.controller.close(); - throw TsonPromiseRejectionError.from(next.value[1]); + throw TsonPromiseRejectionError.from(value[1]); } case ITERATOR_VALUE: { - yield next.value[1]; + yield value[1]; break; } } diff --git a/src/async/handlers/tsonPromise.test.ts b/src/async/handlers/tsonPromise.test.ts index 9e311f99..6539248d 100644 --- a/src/async/handlers/tsonPromise.test.ts +++ b/src/async/handlers/tsonPromise.test.ts @@ -11,8 +11,8 @@ import { } from "../../index.js"; import { createTestServer, + sleep, waitError, - waitFor, } from "../../internals/testUtils.js"; import { createTsonParseAsyncInner } from "../deserializeAsync.js"; import { @@ -216,8 +216,7 @@ test("stringifier - no promises", async () => { " {\\"json\\":{\\"foo\\":\\"bar\\"},\\"nonce\\":\\"__tson\\"}", " ,", " [", - " ]", - "]", + "]]", ] `); @@ -255,8 +254,7 @@ test("stringifier - with promise", async () => { " ,", " [", " [0,[0,\\"bar\\"]]", - " ]", - "]", + "]]", ] `); }); @@ -332,8 +330,7 @@ test("stringifier - promise in promise", async () => { " [", " [0,[0,{\\"anotherPromise\\":[\\"Promise\\",1,\\"__tson\\"]}]]", " ,[1,[0,42]]", - " ]", - "]", + "]]", ] `); }); @@ -473,32 +470,13 @@ test("does not crash node when it receives a promise rejection", async () => { const original = { foo: createPromise(() => { throw new Error("foo"); - }, 5), + }, 1), }; const iterator = stringify(original); - const [_result, deferreds] = await parse(iterator); + await parse(iterator); - const result = _result as typeof original; - await waitFor(() => { - assert(deferreds.size === 1); - }); - - await waitFor(() => { - assert(deferreds.size === 0); - }); - - expect(result).toMatchInlineSnapshot(` - { - "foo": Promise {}, - } - `); - - const err = await waitError(result.foo); - - expect(err).toMatchInlineSnapshot( - "[TsonPromiseRejectionError: Promise rejected]", - ); + await sleep(10); }); test("stringify promise rejection", async () => { @@ -528,17 +506,16 @@ test("stringify promise rejection", async () => { } expect(buffer).toMatchInlineSnapshot(` - [ - "[", - " {\\"json\\":{\\"foo\\":[\\"Promise\\",0,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}", - " ,", - " [", - " [0,[0,{\\"err\\":[\\"Promise\\",1,\\"__tson\\"]}]]", - " ,[1,[1,[\\"Error\\",{\\"message\\":\\"foo\\"},\\"__tson\\"]]]", - " ]", - "]", - ] - `); + [ + "[", + " {\\"json\\":{\\"foo\\":[\\"Promise\\",0,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}", + " ,", + " [", + " [0,[0,{\\"err\\":[\\"Promise\\",1,\\"__tson\\"]}]]", + " ,[1,[1,[\\"Error\\",{\\"message\\":\\"foo\\"},\\"__tson\\"]]]", + "]]", + ] + `); } { diff --git a/src/async/handlers/tsonPromise.ts b/src/async/handlers/tsonPromise.ts index aab14544..ad650d0b 100644 --- a/src/async/handlers/tsonPromise.ts +++ b/src/async/handlers/tsonPromise.ts @@ -1,4 +1,7 @@ -import { TsonPromiseRejectionError } from "../../errors.js"; +import { + TsonPromiseRejectionError, + TsonStreamInterruptedError, +} from "../asyncErrors.js"; import { TsonAsyncType } from "../asyncTypes.js"; function isPromise(value: unknown): value is Promise { @@ -24,14 +27,23 @@ export const tsonPromise: TsonAsyncType = { deserialize: (opts) => { const promise = new Promise((resolve, reject) => { async function _handle() { - const value = await opts.reader.read(); + const next = await opts.reader.read(); opts.controller.close(); - if (value.done) { - throw new Error("Expected promise value, got done"); + if (next.done) { + throw new TsonPromiseRejectionError( + "Expected promise value, got done", + ); } - const [status, result] = value.value; + const { value } = next; + + if (value instanceof TsonStreamInterruptedError) { + reject(TsonPromiseRejectionError.from(value)); + return; + } + + const [status, result] = value; status === PROMISE_RESOLVED ? resolve(result) diff --git a/src/async/serializeAsync.test.ts b/src/async/serializeAsync.test.ts index 2d7e8430..48b77e93 100644 --- a/src/async/serializeAsync.test.ts +++ b/src/async/serializeAsync.test.ts @@ -191,8 +191,7 @@ test("stringify async iterable + promise", async () => { " ,[0,[0,[\\"bigint\\",\\"1\\",\\"__tson\\"]]]", " ,[0,[0,[\\"bigint\\",\\"2\\",\\"__tson\\"]]]", " ,[0,[2]]", - " ]", - "]", + "]]", ] `); diff --git a/src/async/serializeAsync.ts b/src/async/serializeAsync.ts index bf3951ba..13d20e9a 100644 --- a/src/async/serializeAsync.ts +++ b/src/async/serializeAsync.ts @@ -243,8 +243,7 @@ export function createTsonStringifyAsync( isFirstStreamedValue = false; } - yield indent(space * 1) + "]" + "\n"; // end value array - yield "]" + "\n"; // end response + yield "]]" + "\n"; // end response and value array }; return stringifier as TsonAsyncStringifier; diff --git a/src/errors.ts b/src/errors.ts index a66d0240..2c2810b1 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,5 +1,3 @@ -import { isPlainObject } from "./internals/isPlainObject.js"; - export class TsonError extends Error { constructor(message: string, opts?: ErrorOptions) { super(message, opts); @@ -21,35 +19,3 @@ export class TsonCircularReferenceError extends TsonError { this.value = value; } } - -function getErrorMessageFromUnknown(unknown: unknown): null | string { - if (typeof unknown === "string") { - return unknown; - } - - if (unknown instanceof Error) { - return unknown.message; - } - - if (isPlainObject(unknown) && typeof unknown["message"] === "string") { - return unknown["message"]; - } - - return null; -} - -export class TsonPromiseRejectionError extends TsonError { - constructor(cause: unknown) { - // get error message from cause if possible - - const message = getErrorMessageFromUnknown(cause) ?? "Promise rejected"; - super(message, { cause }); - this.name = "TsonPromiseRejectionError"; - } - - static from(cause: unknown) { - return cause instanceof Error - ? cause - : new TsonPromiseRejectionError(cause); - } -} diff --git a/src/index.test.ts b/src/index.test.ts index a4b6dd4a..f17c20d4 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -1,14 +1,7 @@ -import { expect, test, vitest } from "vitest"; - -import { - TsonOptions, - TsonType, - createTson, - createTsonAsync, - tsonPromise, -} from "./index.js"; -import { expectError, waitError, waitFor } from "./internals/testUtils.js"; -import { TsonSerialized } from "./sync/syncTypes.js"; +import { expect, test } from "vitest"; + +import { TsonOptions, TsonType, createTson, createTsonAsync } from "./index.js"; +import { expectError, waitError } from "./internals/testUtils.js"; test("multiple handlers for primitive string found", () => { const stringHandler: TsonType = { @@ -143,41 +136,3 @@ test("async: bad init", async () => { "[TsonError: Failed to initialize TSON stream]", ); }); - -test("async: bad values", async () => { - async function* generator() { - await Promise.resolve(); - - yield "[" + "\n"; - - const obj = { - json: { - foo: ["Promise", 0, "__tson"], - }, - nonce: "__tson", - } as TsonSerialized; - yield JSON.stringify(obj) + "\n"; - - await Promise.resolve(); - - yield " ," + "\n"; - yield " [" + "\n"; - // [....... values should be here .......] - yield " ]" + "\n"; - yield "]"; - } - - const onErrorSpy = vitest.fn(); - await createTsonAsync({ - onStreamError: onErrorSpy, - types: [tsonPromise], - }).parse(generator()); - - await waitFor(() => { - expect(onErrorSpy).toHaveBeenCalledTimes(1); - }); - - expect(onErrorSpy.mock.calls[0][0]).toMatchInlineSnapshot( - "[TsonError: Stream interrupted: Stream ended with 1 pending promises]", - ); -}); diff --git a/src/index.ts b/src/index.ts index b263389e..9da7b167 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,7 +19,6 @@ export * from "./sync/handlers/tsonSymbol.js"; // --- async -- export type { TsonAsyncOptions } from "./async/asyncTypes.js"; - export { createTsonAsync } from "./async/createTsonAsync.js"; export { createTsonParseAsync } from "./async/deserializeAsync.js"; export { @@ -28,5 +27,5 @@ export { } from "./async/serializeAsync.js"; // type handlers -export * from "./async/handlers/tsonAsyncIterable.js"; export * from "./async/handlers/tsonPromise.js"; +export * from "./async/handlers/tsonAsyncIterable.js"; diff --git a/src/internals/testUtils.ts b/src/internals/testUtils.ts index 6b85b820..560a1b22 100644 --- a/src/internals/testUtils.ts +++ b/src/internals/testUtils.ts @@ -72,3 +72,21 @@ export async function createTestServer(opts: { url: `http://localhost:${port}`, }; } + +export function createDeferred() { + type PromiseResolve = (value: T) => void; + type PromiseReject = (reason: unknown) => void; + const deferred = {} as { + promise: Promise; + reject: PromiseReject; + resolve: PromiseResolve; + }; + deferred.promise = new Promise((resolve, reject) => { + deferred.resolve = resolve; + deferred.reject = reject; + }); + return deferred; +} + +export const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms));