Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

feat: tsonAsyncGeneratorFunction proposal #84

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
TsonParseAsyncOptions,
TsonType,
createTsonParseAsync,
tsonAsyncGeneratorFunction,
tsonAsyncIterable,
tsonBigint,
tsonPromise,
Expand Down Expand Up @@ -92,17 +93,22 @@ test("deserialize async iterable", async () => {
}
});

test("stringify async iterable + promise", async () => {
test("stringify async iterable + promise + async generator function", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterable, tsonPromise, tsonBigint],
types: [
tsonAsyncIterable,
tsonPromise,
tsonBigint,
tsonAsyncGeneratorFunction,
],
});

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

async function* iterable() {
async function* generator() {
await sleep(1);
yield 1n;
await sleep(1);
Expand All @@ -116,7 +122,8 @@ test("stringify async iterable + promise", async () => {

const input = {
foo: "bar",
iterable: iterable(),
generator,
iterable: generator(),
promise: Promise.resolve(42),
};

Expand All @@ -128,13 +135,29 @@ test("stringify async iterable + promise", async () => {

expect(await output.promise).toEqual(42);

const result = [];

const iteratorResult = [];
for await (const value of output.iterable) {
result.push(value);
iteratorResult.push(value);
}

expect(iteratorResult).toEqual([1n, 2n, 3n, 4n, 5n]);

const generatorResult1 = [];
const iterator1 = output.generator();
for await (const value of iterator1) {
generatorResult1.push(value);
}

expect(generatorResult1).toEqual([1n, 2n, 3n, 4n, 5n]);

// generator should be able to be iterated again
const generatorResult2 = [];
const iterator2 = output.generator();
for await (const value of iterator2) {
generatorResult2.push(value);
}

expect(result).toEqual([1n, 2n, 3n, 4n, 5n]);
expect(generatorResult2).toEqual([1n, 2n, 3n, 4n, 5n]);
});

test("e2e: stringify async iterable and promise over the network", async () => {
Expand Down
151 changes: 151 additions & 0 deletions src/async/handlers/tsonAsyncGeneratorFunction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import {
TsonAbortError,
TsonPromiseRejectionError,
TsonStreamInterruptedError,
} from "../asyncErrors.js";
import { TsonAsyncType } from "../asyncTypes.js";

const ITERATOR_VALUE = 0;
const ITERATOR_ERROR = 1;
const ITERATOR_DONE = 2;
type SerializedIterableResult =
| [typeof ITERATOR_DONE]
| [typeof ITERATOR_ERROR, unknown]
| [typeof ITERATOR_VALUE, unknown];

function isAsyncGeneratorFunction(
value: unknown,
): value is () => AsyncGenerator<unknown, void> {
return (
typeof value === "function" &&
value.prototype[Symbol.toStringTag] === "AsyncGenerator"
);
}

export const tsonAsyncGeneratorFunction: TsonAsyncType<
() => AsyncGenerator<unknown, void>,
SerializedIterableResult
> = {
async: true,
deserialize: (opts) => {
// each value is stored in RAM for generator to be iterated many times
const chunks: Exclude<
Awaited<ReturnType<(typeof opts.reader)["read"]>>["value"],
undefined
>[] = [];
// we need to know if stream is done or just waiting, so that generator can stop looping
let collectionDone = false;
// if generator is being iterated while data is still being collected, we need to be able to wait on the next chunks
let resolveNext: () => void;
let promiseNext = new Promise<void>((resolve) => (resolveNext = resolve));

/**
* Collects chunks from the stream until it's done
* - handle closing the stream
* - handle generating new promises for generator to wait on
*/
void (async function collect() {
let next: Awaited<ReturnType<(typeof opts.reader)["read"]>>;
loop: while (((next = await opts.reader.read()), !next.done)) {
const { value } = next;
chunks.push(value);
if (value instanceof TsonStreamInterruptedError) {
if (value.cause instanceof TsonAbortError) {
opts.close();
return;
}

throw value; // <-- is this `throw` necessary for "stream management" / "error reporting"? Or should we only throw in the generator?
}

Check warning on line 59 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncGeneratorFunction.ts#L53-L59

Added lines #L53 - L59 were not covered by tests

switch (value[0]) {
case ITERATOR_DONE: {
opts.close();
break loop;
}

case ITERATOR_ERROR: {
opts.close();
break;
}

Check warning on line 70 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncGeneratorFunction.ts#L68-L70

Added lines #L68 - L70 were not covered by tests
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext`
resolveNext!();
promiseNext = new Promise<void>((resolve) => (resolveNext = resolve));
}

collectionDone = true;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- synchronously set when creating `promiseNext`
resolveNext!();
})();

/**
* Generator that yields values from the stream
* - handles waiting for chunks if stream is still active
* - handles throwing errors from values
* @yields {unknown}
*/
return async function* generator() {
await promiseNext;
for (let i = 0; i < chunks.length; i++) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- `i` is always in range
const value = chunks[i]!;
if (value instanceof TsonStreamInterruptedError) {
if (value.cause instanceof TsonAbortError) {
return;
}

throw value;
}

Check warning on line 100 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncGeneratorFunction.ts#L95-L100

Added lines #L95 - L100 were not covered by tests

switch (value[0]) {
case ITERATOR_DONE: {
return;
}

case ITERATOR_ERROR: {
throw TsonPromiseRejectionError.from(value[1]);
}

Check warning on line 109 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncGeneratorFunction.ts#L108-L109

Added lines #L108 - L109 were not covered by tests

case ITERATOR_VALUE: {
yield value[1];
break; // <-- breaks the switch, not the loop
}
}

if (i === chunks.length - 1) {
if (collectionDone) {
break;
}

await promiseNext;
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- check before await to save 1 tick
if (collectionDone) {
break;
}
}

Check warning on line 127 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncGeneratorFunction.ts#L118-L127

Added lines #L118 - L127 were not covered by tests
}
};
},
key: "AsyncGeneratorFunction",
serializeIterator: async function* serialize(opts) {
if (opts.value.length !== 0) {
throw new Error(
`AsyncGeneratorFunction must have 0 arguments to be serializable, got ${opts.value.length}`,
);
}

Check warning on line 137 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncGeneratorFunction.ts#L134-L137

Added lines #L134 - L137 were not covered by tests

try {
const iterator = opts.value();
for await (const value of iterator) {
yield [ITERATOR_VALUE, value];
}

yield [ITERATOR_DONE];
} catch (err) {
yield [ITERATOR_ERROR, err];
}

Check warning on line 148 in src/async/handlers/tsonAsyncGeneratorFunction.ts

View check run for this annotation

Codecov / codecov/patch

src/async/handlers/tsonAsyncGeneratorFunction.ts#L147-L148

Added lines #L147 - L148 were not covered by tests
},
test: isAsyncGeneratorFunction,
};
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ export * from "./async/asyncErrors.js";
// type handlers
export * from "./async/handlers/tsonPromise.js";
export * from "./async/handlers/tsonAsyncIterable.js";
export * from "./async/handlers/tsonAsyncGeneratorFunction.js";