From b1733d97be1eb53bb1cd545ea6185df5b918dd84 Mon Sep 17 00:00:00 2001 From: Sheraff Date: Sat, 28 Oct 2023 11:25:01 +0200 Subject: [PATCH 1/3] feat: tsonAsyncGeneratorFunction proposal for 'iterators that can be iterated many times' --- src/async/deserializeAsync.test.ts | 31 +++-- .../handlers/tsonAsyncGeneratorFunction.ts | 128 ++++++++++++++++++ src/index.ts | 1 + 3 files changed, 152 insertions(+), 8 deletions(-) create mode 100644 src/async/handlers/tsonAsyncGeneratorFunction.ts diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 37da08a..15365c7 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -8,6 +8,7 @@ import { tsonAsyncIterable, tsonBigint, tsonPromise, + tsonAsyncGeneratorFunction, } from "../index.js"; import { assert } from "../internals/assert.js"; import { @@ -92,17 +93,17 @@ test("deserialize async iterable", async () => { } }); -test("stringify async iterable + promise", async () => { +test.only("stringify async iterable + promise + async generator function", async () => { const tson = createTsonAsync({ nonce: () => "__tson", - types: [tsonAsyncIterable, tsonPromise, tsonBigint], + types: [tsonAsyncIterable, tsonPromise, tsonBigint, tsonAsyncGeneratorFunction], }); const parseOptions = { onStreamError: vitest.fn(), } satisfies TsonParseAsyncOptions; - async function* iterable() { + async function* generator() { await sleep(1); yield 1n; await sleep(1); @@ -116,8 +117,9 @@ test("stringify async iterable + promise", async () => { const input = { foo: "bar", - iterable: iterable(), + iterable: generator(), promise: Promise.resolve(42), + generator, }; const strIterable = tson.stringifyJsonStream(input); @@ -128,13 +130,26 @@ test("stringify async iterable + promise", async () => { expect(await output.promise).toEqual(42); - const result = []; - + const iteratorResult = []; for await (const value of output.iterable) { - result.push(value); + iteratorResult.push(value); } + expect(iteratorResult).toEqual([1n, 2n, 3n, 4n, 5n]); - expect(result).toEqual([1n, 2n, 3n, 4n, 5n]); + const generatorResult1 = []; + const iterator1 = output.generator(); + for await (const value of iterator1) { + generatorResult1.push(value); + } + expect(generatorResult1).toEqual([1n, 2n, 3n, 4n, 5n]); + + // generator should be able to be iterated again + const generatorResult2 = []; + const iterator2 = output.generator(); + for await (const value of iterator2) { + generatorResult2.push(value); + } + expect(generatorResult2).toEqual([1n, 2n, 3n, 4n, 5n]); }); test("e2e: stringify async iterable and promise over the network", async () => { diff --git a/src/async/handlers/tsonAsyncGeneratorFunction.ts b/src/async/handlers/tsonAsyncGeneratorFunction.ts new file mode 100644 index 0000000..bb2dd01 --- /dev/null +++ b/src/async/handlers/tsonAsyncGeneratorFunction.ts @@ -0,0 +1,128 @@ +import { + TsonAbortError, + TsonPromiseRejectionError, + TsonStreamInterruptedError, +} from "../asyncErrors.js"; +import { TsonAsyncType } from "../asyncTypes.js"; + +const ITERATOR_VALUE = 0; +const ITERATOR_ERROR = 1; +const ITERATOR_DONE = 2; +type SerializedIterableResult = + | [typeof ITERATOR_DONE] + | [typeof ITERATOR_ERROR, unknown] + | [typeof ITERATOR_VALUE, unknown]; + +function isAsyncGeneratorFunction(value: unknown): value is () => AsyncGenerator { + return ( + !!value && + typeof value === "function" && + value.prototype[Symbol.toStringTag] === "AsyncGenerator" + ); +} + +export const tsonAsyncGeneratorFunction: TsonAsyncType< + () => AsyncGenerator, + SerializedIterableResult +> = { + async: true, + deserialize: (opts) => { + // each value is stored in RAM for generator to be iterated many times + const chunks: Exclude>['value'], undefined>[] = [] + // we need to know if stream is done or just waiting, so that generator can stop looping + let collectionDone = false + // if generator is being iterated while data is still being collected, we need to be able to wait on the next chunks + let resolveNext: () => void + let promiseNext = new Promise(resolve => resolveNext = resolve) + + /** + * Collects chunks from the stream until it's done + * - handle closing the stream + * - handle generating new promises for generator to wait on + */ + void async function collect() { + let next: Awaited>; + loop: while (((next = await opts.reader.read()), !next.done)) { + const { value } = next + chunks.push(value) + if (value instanceof TsonStreamInterruptedError) { + if (value.cause instanceof TsonAbortError) { + opts.close() + return + } + throw value // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator? + } + switch (value[0]) { + case ITERATOR_DONE: { + opts.close(); + break loop; + } + case ITERATOR_ERROR: { + opts.close(); + break; + } + } + resolveNext!() + promiseNext = new Promise(resolve => resolveNext = resolve) + } + collectionDone = true + resolveNext!() + }() + + /** + * Generator that yields values from the stream + * - handles waiting for chunks if stream is still active + * - handles throwing errors from values + */ + return async function* generator() { + await promiseNext + for (let i = 0; i < chunks.length; i++) { + const value = chunks[i]! + if (value instanceof TsonStreamInterruptedError) { + if (value.cause instanceof TsonAbortError) { + return; + } + throw value; + } + switch (value[0]) { + case ITERATOR_DONE: { + return; + } + + case ITERATOR_ERROR: { + throw TsonPromiseRejectionError.from(value[1]); + } + + case ITERATOR_VALUE: { + yield value[1]; + break; // <-- breaks the switch, not the loop + } + } + if (i === chunks.length - 1) { + if (collectionDone) break + await promiseNext + if (collectionDone) break + } + } + }; + }, + key: "AsyncGeneratorFunction", + serializeIterator: async function* serialize(opts) { + if (opts.value.length !== 0) { + throw new Error( + `AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}` + ); + } + try { + const iterator = opts.value() + for await (const value of iterator) { + yield [ITERATOR_VALUE, value]; + } + + yield [ITERATOR_DONE]; + } catch (err) { + yield [ITERATOR_ERROR, err]; + } + }, + test: isAsyncGeneratorFunction, +}; diff --git a/src/index.ts b/src/index.ts index 8c92aca..faef4e9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,3 +38,4 @@ export * from "./async/asyncErrors.js"; // type handlers export * from "./async/handlers/tsonPromise.js"; export * from "./async/handlers/tsonAsyncIterable.js"; +export * from "./async/handlers/tsonAsyncGeneratorFunction.js"; From 9d506b4558fa54257711400ede68417bb817988c Mon Sep 17 00:00:00 2001 From: Sheraff Date: Sat, 28 Oct 2023 11:43:27 +0200 Subject: [PATCH 2/3] cleanup --- src/async/deserializeAsync.test.ts | 9 ++++++--- .../handlers/tsonAsyncGeneratorFunction.ts | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 15365c7..dba463c 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -5,10 +5,10 @@ import { TsonParseAsyncOptions, TsonType, createTsonParseAsync, + tsonAsyncGeneratorFunction, tsonAsyncIterable, tsonBigint, tsonPromise, - tsonAsyncGeneratorFunction, } from "../index.js"; import { assert } from "../internals/assert.js"; import { @@ -93,7 +93,7 @@ test("deserialize async iterable", async () => { } }); -test.only("stringify async iterable + promise + async generator function", async () => { +test("stringify async iterable + promise + async generator function", async () => { const tson = createTsonAsync({ nonce: () => "__tson", types: [tsonAsyncIterable, tsonPromise, tsonBigint, tsonAsyncGeneratorFunction], @@ -117,9 +117,9 @@ test.only("stringify async iterable + promise + async generator function", async const input = { foo: "bar", + generator, iterable: generator(), promise: Promise.resolve(42), - generator, }; const strIterable = tson.stringifyJsonStream(input); @@ -134,6 +134,7 @@ test.only("stringify async iterable + promise + async generator function", async for await (const value of output.iterable) { iteratorResult.push(value); } + expect(iteratorResult).toEqual([1n, 2n, 3n, 4n, 5n]); const generatorResult1 = []; @@ -141,6 +142,7 @@ test.only("stringify async iterable + promise + async generator function", async for await (const value of iterator1) { generatorResult1.push(value); } + expect(generatorResult1).toEqual([1n, 2n, 3n, 4n, 5n]); // generator should be able to be iterated again @@ -149,6 +151,7 @@ test.only("stringify async iterable + promise + async generator function", async for await (const value of iterator2) { generatorResult2.push(value); } + expect(generatorResult2).toEqual([1n, 2n, 3n, 4n, 5n]); }); diff --git a/src/async/handlers/tsonAsyncGeneratorFunction.ts b/src/async/handlers/tsonAsyncGeneratorFunction.ts index bb2dd01..fe1680c 100644 --- a/src/async/handlers/tsonAsyncGeneratorFunction.ts +++ b/src/async/handlers/tsonAsyncGeneratorFunction.ts @@ -13,7 +13,7 @@ type SerializedIterableResult = | [typeof ITERATOR_ERROR, unknown] | [typeof ITERATOR_VALUE, unknown]; -function isAsyncGeneratorFunction(value: unknown): value is () => AsyncGenerator { +function isAsyncGeneratorFunction(value: unknown): value is () => AsyncGenerator { return ( !!value && typeof value === "function" && @@ -22,7 +22,7 @@ function isAsyncGeneratorFunction(value: unknown): value is () => AsyncGenerator } export const tsonAsyncGeneratorFunction: TsonAsyncType< - () => AsyncGenerator, + () => AsyncGenerator, SerializedIterableResult > = { async: true, @@ -50,22 +50,29 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< opts.close() return } + throw value // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator? } + switch (value[0]) { case ITERATOR_DONE: { opts.close(); break loop; } + case ITERATOR_ERROR: { opts.close(); break; } } + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext` resolveNext!() promiseNext = new Promise(resolve => resolveNext = resolve) } + collectionDone = true + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext` resolveNext!() }() @@ -73,17 +80,21 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< * Generator that yields values from the stream * - handles waiting for chunks if stream is still active * - handles throwing errors from values + * @yields {unknown} */ return async function* generator() { await promiseNext for (let i = 0; i < chunks.length; i++) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- `i` is always in range const value = chunks[i]! if (value instanceof TsonStreamInterruptedError) { if (value.cause instanceof TsonAbortError) { return; } + throw value; } + switch (value[0]) { case ITERATOR_DONE: { return; @@ -98,6 +109,7 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< break; // <-- breaks the switch, not the loop } } + if (i === chunks.length - 1) { if (collectionDone) break await promiseNext @@ -113,6 +125,7 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< `AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}` ); } + try { const iterator = opts.value() for await (const value of iterator) { From a4839b5b0715d2d5b6ccdd47482c3c90310866ee Mon Sep 17 00:00:00 2001 From: Sheraff Date: Sat, 28 Oct 2023 11:55:24 +0200 Subject: [PATCH 3/3] cleanup --- src/async/deserializeAsync.test.ts | 9 ++- .../handlers/tsonAsyncGeneratorFunction.ts | 58 +++++++++++-------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index dba463c..6bae744 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -96,7 +96,12 @@ test("deserialize async iterable", async () => { test("stringify async iterable + promise + async generator function", async () => { const tson = createTsonAsync({ nonce: () => "__tson", - types: [tsonAsyncIterable, tsonPromise, tsonBigint, tsonAsyncGeneratorFunction], + types: [ + tsonAsyncIterable, + tsonPromise, + tsonBigint, + tsonAsyncGeneratorFunction, + ], }); const parseOptions = { @@ -144,7 +149,7 @@ test("stringify async iterable + promise + async generator function", async () = } expect(generatorResult1).toEqual([1n, 2n, 3n, 4n, 5n]); - + // generator should be able to be iterated again const generatorResult2 = []; const iterator2 = output.generator(); diff --git a/src/async/handlers/tsonAsyncGeneratorFunction.ts b/src/async/handlers/tsonAsyncGeneratorFunction.ts index fe1680c..fc72f1e 100644 --- a/src/async/handlers/tsonAsyncGeneratorFunction.ts +++ b/src/async/handlers/tsonAsyncGeneratorFunction.ts @@ -13,9 +13,10 @@ type SerializedIterableResult = | [typeof ITERATOR_ERROR, unknown] | [typeof ITERATOR_VALUE, unknown]; -function isAsyncGeneratorFunction(value: unknown): value is () => AsyncGenerator { +function isAsyncGeneratorFunction( + value: unknown, +): value is () => AsyncGenerator { return ( - !!value && typeof value === "function" && value.prototype[Symbol.toStringTag] === "AsyncGenerator" ); @@ -28,30 +29,33 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< async: true, deserialize: (opts) => { // each value is stored in RAM for generator to be iterated many times - const chunks: Exclude>['value'], undefined>[] = [] + const chunks: Exclude< + Awaited>["value"], + undefined + >[] = []; // we need to know if stream is done or just waiting, so that generator can stop looping - let collectionDone = false + let collectionDone = false; // if generator is being iterated while data is still being collected, we need to be able to wait on the next chunks - let resolveNext: () => void - let promiseNext = new Promise(resolve => resolveNext = resolve) + let resolveNext: () => void; + let promiseNext = new Promise((resolve) => (resolveNext = resolve)); /** * Collects chunks from the stream until it's done * - handle closing the stream * - handle generating new promises for generator to wait on */ - void async function collect() { + void (async function collect() { let next: Awaited>; loop: while (((next = await opts.reader.read()), !next.done)) { - const { value } = next - chunks.push(value) + const { value } = next; + chunks.push(value); if (value instanceof TsonStreamInterruptedError) { if (value.cause instanceof TsonAbortError) { - opts.close() - return + opts.close(); + return; } - throw value // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator? + throw value; // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator? } switch (value[0]) { @@ -67,14 +71,14 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext` - resolveNext!() - promiseNext = new Promise(resolve => resolveNext = resolve) + resolveNext!(); + promiseNext = new Promise((resolve) => (resolveNext = resolve)); } - collectionDone = true + collectionDone = true; // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext` - resolveNext!() - }() + resolveNext!(); + })(); /** * Generator that yields values from the stream @@ -83,10 +87,10 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< * @yields {unknown} */ return async function* generator() { - await promiseNext + await promiseNext; for (let i = 0; i < chunks.length; i++) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- `i` is always in range - const value = chunks[i]! + const value = chunks[i]!; if (value instanceof TsonStreamInterruptedError) { if (value.cause instanceof TsonAbortError) { return; @@ -111,9 +115,15 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< } if (i === chunks.length - 1) { - if (collectionDone) break - await promiseNext - if (collectionDone) break + if (collectionDone) { + break; + } + + await promiseNext; + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- check before await to save 1 tick + if (collectionDone) { + break; + } } } }; @@ -122,12 +132,12 @@ export const tsonAsyncGeneratorFunction: TsonAsyncType< serializeIterator: async function* serialize(opts) { if (opts.value.length !== 0) { throw new Error( - `AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}` + `AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}`, ); } try { - const iterator = opts.value() + const iterator = opts.value(); for await (const value of iterator) { yield [ITERATOR_VALUE, value]; }