diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 37da08a..6bae744 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -5,6 +5,7 @@ import { TsonParseAsyncOptions, TsonType, createTsonParseAsync, + tsonAsyncGeneratorFunction, tsonAsyncIterable, tsonBigint, tsonPromise, @@ -92,17 +93,22 @@ test("deserialize async iterable", async () => { } }); -test("stringify async iterable + promise", async () => { +test("stringify async iterable + promise + async generator function", async () => { const tson = createTsonAsync({ nonce: () => "__tson", - types: [tsonAsyncIterable, tsonPromise, tsonBigint], + types: [ + tsonAsyncIterable, + tsonPromise, + tsonBigint, + tsonAsyncGeneratorFunction, + ], }); const parseOptions = { onStreamError: vitest.fn(), } satisfies TsonParseAsyncOptions; - async function* iterable() { + async function* generator() { await sleep(1); yield 1n; await sleep(1); @@ -116,7 +122,8 @@ test("stringify async iterable + promise", async () => { const input = { foo: "bar", - iterable: iterable(), + generator, + iterable: generator(), promise: Promise.resolve(42), }; @@ -128,13 +135,29 @@ test("stringify async iterable + promise", async () => { expect(await output.promise).toEqual(42); - const result = []; - + const iteratorResult = []; for await (const value of output.iterable) { - result.push(value); + iteratorResult.push(value); + } + + expect(iteratorResult).toEqual([1n, 2n, 3n, 4n, 5n]); + + const generatorResult1 = []; + const iterator1 = output.generator(); + for await (const value of iterator1) { + generatorResult1.push(value); + } + + expect(generatorResult1).toEqual([1n, 2n, 3n, 4n, 5n]); + + // generator should be able to be iterated again + const generatorResult2 = []; + const iterator2 = output.generator(); + for await (const value of iterator2) { + generatorResult2.push(value); } - expect(result).toEqual([1n, 2n, 3n, 4n, 5n]); + expect(generatorResult2).toEqual([1n, 2n, 3n, 4n, 5n]); }); test("e2e: stringify async iterable and promise over the network", async () => { diff --git a/src/async/handlers/tsonAsyncGeneratorFunction.ts b/src/async/handlers/tsonAsyncGeneratorFunction.ts new file mode 100644 index 0000000..fc72f1e --- /dev/null +++ b/src/async/handlers/tsonAsyncGeneratorFunction.ts @@ -0,0 +1,151 @@ +import { + TsonAbortError, + TsonPromiseRejectionError, + TsonStreamInterruptedError, +} from "../asyncErrors.js"; +import { TsonAsyncType } from "../asyncTypes.js"; + +const ITERATOR_VALUE = 0; +const ITERATOR_ERROR = 1; +const ITERATOR_DONE = 2; +type SerializedIterableResult = + | [typeof ITERATOR_DONE] + | [typeof ITERATOR_ERROR, unknown] + | [typeof ITERATOR_VALUE, unknown]; + +function isAsyncGeneratorFunction( + value: unknown, +): value is () => AsyncGenerator { + return ( + typeof value === "function" && + value.prototype[Symbol.toStringTag] === "AsyncGenerator" + ); +} + +export const tsonAsyncGeneratorFunction: TsonAsyncType< + () => AsyncGenerator, + SerializedIterableResult +> = { + async: true, + deserialize: (opts) => { + // each value is stored in RAM for generator to be iterated many times + const chunks: Exclude< + Awaited>["value"], + undefined + >[] = []; + // we need to know if stream is done or just waiting, so that generator can stop looping + let collectionDone = false; + // if generator is being iterated while data is still being collected, we need to be able to wait on the next chunks + let resolveNext: () => void; + let promiseNext = new Promise((resolve) => (resolveNext = resolve)); + + /** + * Collects chunks from the stream until it's done + * - handle closing the stream + * - handle generating new promises for generator to wait on + */ + void (async function collect() { + let next: Awaited>; + loop: while (((next = await opts.reader.read()), !next.done)) { + const { value } = next; + chunks.push(value); + if (value instanceof TsonStreamInterruptedError) { + if (value.cause instanceof TsonAbortError) { + opts.close(); + return; + } + + throw value; // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator? + } + + switch (value[0]) { + case ITERATOR_DONE: { + opts.close(); + break loop; + } + + case ITERATOR_ERROR: { + opts.close(); + break; + } + } + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext` + resolveNext!(); + promiseNext = new Promise((resolve) => (resolveNext = resolve)); + } + + collectionDone = true; + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext` + resolveNext!(); + })(); + + /** + * Generator that yields values from the stream + * - handles waiting for chunks if stream is still active + * - handles throwing errors from values + * @yields {unknown} + */ + return async function* generator() { + await promiseNext; + for (let i = 0; i < chunks.length; i++) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- `i` is always in range + const value = chunks[i]!; + if (value instanceof TsonStreamInterruptedError) { + if (value.cause instanceof TsonAbortError) { + return; + } + + throw value; + } + + switch (value[0]) { + case ITERATOR_DONE: { + return; + } + + case ITERATOR_ERROR: { + throw TsonPromiseRejectionError.from(value[1]); + } + + case ITERATOR_VALUE: { + yield value[1]; + break; // <-- breaks the switch, not the loop + } + } + + if (i === chunks.length - 1) { + if (collectionDone) { + break; + } + + await promiseNext; + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- check before await to save 1 tick + if (collectionDone) { + break; + } + } + } + }; + }, + key: "AsyncGeneratorFunction", + serializeIterator: async function* serialize(opts) { + if (opts.value.length !== 0) { + throw new Error( + `AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}`, + ); + } + + try { + const iterator = opts.value(); + for await (const value of iterator) { + yield [ITERATOR_VALUE, value]; + } + + yield [ITERATOR_DONE]; + } catch (err) { + yield [ITERATOR_ERROR, err]; + } + }, + test: isAsyncGeneratorFunction, +}; diff --git a/src/index.ts b/src/index.ts index 8c92aca..faef4e9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,3 +38,4 @@ export * from "./async/asyncErrors.js"; // type handlers export * from "./async/handlers/tsonPromise.js"; export * from "./async/handlers/tsonAsyncIterable.js"; +export * from "./async/handlers/tsonAsyncGeneratorFunction.js";