Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
fix: move onStreamError to parse options + add test for request abo…
Browse files Browse the repository at this point in the history
…rtion (#55)
  • Loading branch information
KATT authored Oct 8, 2023
1 parent 7b08dd5 commit 9d5db3c
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 33 deletions.
4 changes: 0 additions & 4 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ export interface TsonAsyncOptions {
* @default `${crypto.randomUUID} if available, otherwise a random string generated by Math.random`
*/
nonce?: () => number | string;
/**
* On stream error
*/
onStreamError?: (err: TsonStreamInterruptedError) => void;

/**
* The list of types to use
Expand Down
185 changes: 162 additions & 23 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { expect, test, vi, vitest } from "vitest";
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { expect, test, vitest } from "vitest";

import {
TsonAsyncOptions,
TsonParseAsyncOptions,
TsonType,
createTsonAsync,
createTsonParseAsync,
Expand All @@ -17,7 +20,6 @@ import {
waitFor,
} from "../internals/testUtils.js";
import { TsonSerialized } from "../sync/syncTypes.js";
import { TsonAsyncOptions } from "./asyncTypes.js";
import { mapIterable, readableStreamToAsyncIterable } from "./iterableUtils.js";

test("deserialize variable chunk length", async () => {
Expand Down Expand Up @@ -92,13 +94,15 @@ test("deserialize async iterable", async () => {
});

test("stringify async iterable + promise", async () => {
const onErr = vi.fn();
const tson = createTsonAsync({
nonce: () => "__tson",
onStreamError: onErr,
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

async function* iterable() {
await sleep(1);
yield 1n;
Expand All @@ -119,7 +123,7 @@ test("stringify async iterable + promise", async () => {

const strIterable = tson.stringify(input);

const output = await tson.parse(strIterable);
const output = await tson.parse(strIterable, parseOptions);

expect(output.foo).toEqual("bar");

Expand Down Expand Up @@ -348,16 +352,19 @@ test("values missing when stream ends", async () => {
}

const opts = {
onStreamError: vitest.fn(),
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

const parse = createTsonParseAsync(opts);

const result = await parse<{
iterable: AsyncIterable<string>;
promise: Promise<unknown>;
}>(generator());
}>(generator(), parseOptions);

{
// iterator should error
Expand Down Expand Up @@ -388,8 +395,8 @@ test("values missing when stream ends", async () => {
);
}

expect(opts.onStreamError).toHaveBeenCalledTimes(1);
expect(opts.onStreamError.mock.calls).toMatchInlineSnapshot(`
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(`
[
[
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly],
Expand Down Expand Up @@ -420,17 +427,19 @@ test("async: missing values of promise", async () => {
// yield "]]\n"; // <-- stream and values ended symbol
}

const onErrorSpy = vitest.fn();
const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

await createTsonAsync({
onStreamError: onErrorSpy,
types: [tsonPromise],
}).parse(generator());
}).parse(generator(), parseOptions);

await waitFor(() => {
expect(onErrorSpy).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);
});

expect(onErrorSpy.mock.calls[0][0]).toMatchInlineSnapshot(
expect(parseOptions.onStreamError.mock.calls[0]![0]!).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]",
);
});
Expand Down Expand Up @@ -469,16 +478,18 @@ test("1 iterator completed but another never finishes", async () => {
}

const opts = {
onStreamError: vitest.fn(),
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;
const parse = createTsonParseAsync(opts);

const result = await parse<{
iterable1: AsyncIterable<string>;
iterable2: AsyncIterable<string>;
}>(generator());
}>(generator(), parseOptions);

{
// iterator 1 should complete
Expand Down Expand Up @@ -517,9 +528,9 @@ test("1 iterator completed but another never finishes", async () => {
);
}

expect(opts.onStreamError).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);

expect(opts.onStreamError.mock.calls).toMatchInlineSnapshot(`
expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(`
[
[
[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly],
Expand Down Expand Up @@ -556,10 +567,13 @@ test("e2e: simulated server crash", async () => {

// ------------- server -------------------
const opts = {
onStreamError: vi.fn(),
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);
Expand Down Expand Up @@ -594,7 +608,7 @@ test("e2e: simulated server crash", async () => {
(v) => textDecoder.decode(v),
);

const parsed = await tson.parse<MockObj>(stringIterator);
const parsed = await tson.parse<MockObj>(stringIterator, parseOptions);
{
// check the iterator
const results = [];
Expand Down Expand Up @@ -622,13 +636,138 @@ test("e2e: simulated server crash", async () => {
parsed.rejectedPromise,
).rejects.toThrowErrorMatchingInlineSnapshot('"Promise rejected"');

expect(opts.onStreamError).toHaveBeenCalledTimes(1);
expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const streamError = opts.onStreamError.mock.calls[0]![0]!;
const streamError = parseOptions.onStreamError.mock.calls[0]![0]!;
expect(streamError).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: terminated]",
);

expect(streamError.cause).toMatchInlineSnapshot("[TypeError: terminated]");
});

test("e2e: client aborted request", async () => {
// ------------- server -------------------
const serverSentChunks: string[] = [];
const iteratorChunks: number[] = [];
function createMockObj() {
async function* generator() {
for (let i = 0; i < 10; i++) {
yield i;
iteratorChunks.push(i);
await sleep(5);
}
}

return {
iterable: generator(),
};
}

type MockObj = ReturnType<typeof createMockObj>;
const opts = {
nonce: () => "__tson",
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const parseOptions = {
onStreamError: vitest.fn(),
} satisfies TsonParseAsyncOptions;

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);

const obj = createMockObj();
const strIterarable = tson.stringify(obj, 4);

for await (const value of strIterarable) {
serverSentChunks.push(value.trimEnd());
res.write(value);
}

res.end();
},
});

// ------------- client -------------------
const abortController = new AbortController();

const tson = createTsonAsync(opts);

// do a streamed fetch request
const response = await fetch(server.url, {
signal: abortController.signal,
});

assert(response.body);

const textDecoder = new TextDecoder();
const stringIterator = mapIterable(
readableStreamToAsyncIterable(response.body),
(v) => textDecoder.decode(v),
);

const parsed = await tson.parse<MockObj>(stringIterator, parseOptions);
{
// check the iterator
const results = [];
let iteratorError: Error | null = null;
try {
for await (const value of parsed.iterable) {
results.push(value);

if (value === 5) {
// abort the request after when receiving 5
abortController.abort();
}
}
} catch (err) {
iteratorError = err as Error;
} finally {
server.close();
}

expect(results).toEqual([0, 1, 2, 3, 4, 5]);
expect(iteratorError).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: The operation was aborted.]",
);
}

expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1);

const streamError = parseOptions.onStreamError.mock.calls[0]![0]!;
expect(streamError).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: The operation was aborted.]",
);

expect(streamError.cause).toMatchInlineSnapshot(
"[AbortError: The operation was aborted.]",
);

expect(iteratorChunks.length).toBeLessThan(10);
expect(iteratorChunks).toMatchInlineSnapshot(`
[
0,
1,
2,
3,
4,
5,
]
`);
expect(serverSentChunks).toMatchInlineSnapshot(`
[
"[",
" {\\"json\\":{\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}",
" ,",
" [",
" [0,[0,0]]",
" ,[0,[0,1]]",
" ,[0,[0,2]]",
" ,[0,[0,3]]",
" ,[0,[0,4]]",
" ,[0,[0,5]]",
]
`);
});
20 changes: 16 additions & 4 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ type AnyTsonTransformerSerializeDeserialize =
| TsonAsyncType<any, any>
| TsonTransformerSerializeDeserialize<any, any>;

export interface TsonParseAsyncOptions {
/**
* On stream error
*/
onStreamError?: (err: TsonStreamInterruptedError) => void;
}

type TsonParseAsync = <TValue>(
string: AsyncIterable<string> | TsonAsyncStringifierIterable<TValue>,
opts?: TsonParseAsyncOptions,
) => Promise<TValue>;

export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
Expand All @@ -43,7 +51,10 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
}
}

return async (iterable: AsyncIterable<string>) => {
return async (
iterable: AsyncIterable<string>,
parseOptions: TsonParseAsyncOptions,
) => {
// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
TsonAsyncIndex,
Expand Down Expand Up @@ -139,6 +150,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
do {
lines.forEach(readLine);
lines.length = 0;

const nextValue = await iterator.next();
if (!nextValue.done) {
accumulator += nextValue.value;
Expand Down Expand Up @@ -205,7 +217,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
controller.enqueue(err);
}

opts.onStreamError?.(err);
parseOptions.onStreamError?.(err);
});
}
}
Expand All @@ -220,8 +232,8 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync {
const instance = createTsonParseAsyncInner(opts);

return (async (iterable) => {
const [result] = await instance(iterable);
return (async (iterable, opts) => {
const [result] = await instance(iterable, opts ?? {});

return result;
}) as TsonParseAsync;
Expand Down
2 changes: 1 addition & 1 deletion src/async/handlers/tsonPromise.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ test("does not crash node when it receives a promise rejection", async () => {
};
const iterator = stringify(original);

await parse(iterator);
await parse(iterator, {});

await sleep(10);
});
Expand Down
Loading

0 comments on commit 9d5db3c

Please sign in to comment.