Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
fix: make onStreamError always be of type `TsonStreamInterruptedErr…
Browse files Browse the repository at this point in the history
…or` (#41)



Co-authored-by: KATT <[email protected]>
  • Loading branch information
Sheraff and KATT authored Oct 8, 2023
1 parent 0652176 commit 2614a46
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 2 deletions.
3 changes: 1 addition & 2 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { TsonError } from "../errors.js";
import {
TsonBranded,
TsonType,
Expand Down Expand Up @@ -66,7 +65,7 @@ export interface TsonAsyncOptions {
/**
* On stream error
*/
onStreamError?: (err: TsonError) => void;
onStreamError?: (err: TsonStreamInterruptedError) => void;

/**
* The list of types to use
Expand Down
105 changes: 105 additions & 0 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,108 @@ test("1 iterator completed but another never finishes", async () => {
]
`);
});

test("e2e: simulated server crash", async () => {
const crashedDeferred = createDeferred<null>();
function createMockObj() {
async function* generator() {
for (let i = 0; i < 10; i++) {
yield i;
await sleep(1);
if (i === 5) {
// crash the server after 5 iterations
crashedDeferred.resolve(null);
}

await sleep(1);
}
}

return {
foo: "bar",
iterable: generator(),
promise: Promise.resolve(42),
rejectedPromise: Promise.reject(new Error("rejected promise")),
};
}

type MockObj = ReturnType<typeof createMockObj>;

// ------------- server -------------------
const opts = {
onStreamError: vi.fn(),
types: [tsonPromise, tsonAsyncIterator],
} satisfies TsonAsyncOptions;

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);

const obj = createMockObj();
const strIterarable = tson.stringify(obj, 4);

void crashedDeferred.promise.then(() => {
// destroy the response stream
res.destroy();
});

for await (const value of strIterarable) {
res.write(value);
}

res.end();
},
});

// ------------- client -------------------
const tson = createTsonAsync(opts);

// do a streamed fetch request
const response = await fetch(server.url);

assert(response.body);

const textDecoder = new TextDecoder();
const stringIterator = mapIterable(
readableStreamToAsyncIterable(response.body),
(v) => textDecoder.decode(v),
);

const parsed = await tson.parse<MockObj>(stringIterator);
{
// check the iterator
const results = [];
let iteratorError: Error | null = null;
try {
for await (const value of parsed.iterable) {
results.push(value);
}
} catch (err) {
iteratorError = err as Error;
} finally {
server.close();
}

assert(iteratorError);
expect(iteratorError.message).toMatchInlineSnapshot(
'"Stream interrupted: terminated"',
);
expect(results).toEqual([0, 1, 2, 3, 4, 5]);
}

expect(parsed.foo).toEqual("bar");
expect(await parsed.promise).toEqual(42);
await expect(
parsed.rejectedPromise,
).rejects.toThrowErrorMatchingInlineSnapshot('"Promise rejected"');

expect(opts.onStreamError).toHaveBeenCalledTimes(1);

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const streamError = opts.onStreamError.mock.calls[0]![0]!;
expect(streamError).toMatchInlineSnapshot(
"[TsonStreamInterruptedError: Stream interrupted: terminated]",
);

expect(streamError.cause).toMatchInlineSnapshot("[TypeError: terminated]");
});

0 comments on commit 2614a46

Please sign in to comment.