Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
feat: handle reconnects when response is interrupted (#77)
Browse files Browse the repository at this point in the history
- add option to resume interrupted streams
- we assume that any reconnect will return a result with the same shape

## Notes


- it's a bit mind-bending try to to follow exactly what happens, but the
test seems to pass
- we'll likely use SSE for subscriptions in tRPC and SSE _or_ JSON
streams for query/mutation results
- for subscriptions we'll have `reconnect:true` in our client
  • Loading branch information
KATT authored Oct 20, 2023
1 parent 831d7c4 commit aa2cbad
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 23 deletions.
1 change: 1 addition & 0 deletions examples/async/src/app/sse-infinite/StreamedTimeSSE.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export function StreamedTimeSSE() {
useEffect(() => {
const abortSignal = new AbortController();
createEventSource<ResponseShape>("/sse-infinite", {
reconnect: true,
signal: abortSignal.signal,
})
.then(async (shape) => {
Expand Down
57 changes: 49 additions & 8 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-non-null-assertion */

import { TsonError } from "../errors.js";
Expand Down Expand Up @@ -31,10 +32,20 @@ type AnyTsonTransformerSerializeDeserialize =
| TsonTransformerSerializeDeserialize<any, any>;

export interface TsonParseAsyncOptions {
/**
* Event handler for when the stream reconnects
* You can use this to do extra actions to ensure no messages were lost
*/
onReconnect?: () => void;
/**
* On stream error
*/
onStreamError?: (err: TsonStreamInterruptedError) => void;
/**
* Allow reconnecting to the stream if it's interrupted
* @default false
*/
reconnect?: boolean;
}

type TsonParseAsync = <TValue>(
Expand Down Expand Up @@ -62,10 +73,11 @@ function createTsonDeserializer(opts: TsonAsyncOptions) {
iterable: TsonDeserializeIterable,
parseOptions: TsonParseAsyncOptions,
) => {
const cache = new Map<
const controllers = new Map<
TsonAsyncIndex,
ReadableStreamDefaultController<unknown>
>();
const cache = new Map<TsonAsyncIndex, unknown>();
const iterator = iterable[Symbol.asyncIterator]();

const walker: WalkerFactory = (nonce) => {
Expand All @@ -83,22 +95,34 @@ function createTsonDeserializer(opts: TsonAsyncOptions) {

const idx = serializedValue as TsonAsyncIndex;

if (cache.has(idx)) {
// We already have this async value in the cache - so this is probably a reconnect
assert(
parseOptions.reconnect,
"Duplicate index found but reconnect is off",
);
return cache.get(idx);
}

const [readable, controller] = createReadableStream();

// the `start` method is called "immediately when the object is constructed"
// [MDN](http://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream)
// so we're guaranteed that the controller is set in the cache
assert(controller, "Controller not set - this is a bug");

cache.set(idx, controller);
controllers.set(idx, controller);

return transformer.deserialize({
const result = transformer.deserialize({
close() {
controller.close();
cache.delete(idx);
controllers.delete(idx);
},
reader: readable.getReader(),
});

cache.set(idx, result);
return result;
}

return mapOrReturn(value, walk);
Expand All @@ -117,16 +141,33 @@ function createTsonDeserializer(opts: TsonAsyncOptions) {

const { value } = nextValue;

if (!Array.isArray(value)) {
// we got the beginning of a new stream - probably because a reconnect
// we assume this new stream will have the same shape and restart the walker with the nonce

parseOptions.onReconnect?.();

assert(
parseOptions.reconnect,
"Stream got beginning of results but reconnecting is not enabled",
);

await getStreamedValues(walker(value.nonce));
return;
}

const [index, result] = value as TsonAsyncValueTuple;

const controller = cache.get(index);
const controller = controllers.get(index);

const walkedResult = walk(result);

assert(controller, `No stream found for index ${index}`);
if (!parseOptions.reconnect) {
assert(controller, `No stream found for index ${index}`);
}

// resolving deferred
controller.enqueue(walkedResult);
controller?.enqueue(walkedResult);
}
}

Expand All @@ -152,7 +193,7 @@ function createTsonDeserializer(opts: TsonAsyncOptions) {
const err = new TsonStreamInterruptedError(cause);

// enqueue the error to all the streams
for (const controller of cache.values()) {
for (const controller of controllers.values()) {
controller.enqueue(err);
}

Expand Down
126 changes: 112 additions & 14 deletions src/async/sse.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
/* eslint-disable @typescript-eslint/no-unnecessary-condition */
import { EventSourcePolyfill, NativeEventSource } from "event-source-polyfill";
import { expect, test } from "vitest";
import { expect, test, vi } from "vitest";
(global as any).EventSource = NativeEventSource || EventSourcePolyfill;

import { TsonAsyncOptions, tsonAsyncIterable, tsonPromise } from "../index.js";
import {
TsonAsyncOptions,
tsonAsyncIterable,
tsonBigint,
tsonPromise,
} from "../index.js";
import { createTestServer, sleep } from "../internals/testUtils.js";
import { createTsonAsync } from "./createTsonAsync.js";

Expand All @@ -13,15 +18,12 @@ test("SSE response test", async () => {
let i = 0;
while (true) {
yield i++;
await sleep(100);
await sleep(10);
}
}

return {
foo: "bar",
iterable: generator(),
promise: Promise.resolve(42),
rejectedPromise: Promise.reject(new Error("rejected promise")),
};
}

Expand Down Expand Up @@ -73,14 +75,14 @@ test("SSE response test", async () => {
});

expect(messages).toMatchInlineSnapshot(`
[
"{\\"json\\":{\\"foo\\":\\"bar\\",\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"],\\"promise\\":[\\"Promise\\",1,\\"__tson\\"],\\"rejectedPromise\\":[\\"Promise\\",2,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}",
"[0,[0,0]]",
"[1,[0,42]]",
"[2,[1,{}]]",
"[0,[0,1]]",
]
`);
[
"{\\"json\\":{\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}",
"[0,[0,0]]",
"[0,[0,1]]",
"[0,[0,2]]",
"[0,[0,3]]",
]
`);
}

{
Expand Down Expand Up @@ -110,3 +112,99 @@ test("SSE response test", async () => {
`);
}
});

test("handle reconnects - iterator wrapped in Promise", async () => {
let i = 0;

let kill = false;
function createMockObj() {
async function* generator() {
while (true) {
await sleep(10);
yield BigInt(i);
i++;

if (i % 5 === 0) {
kill = true;
}

if (i > 11) {
// done
return;
}
}
}

return {
iterable: Promise.resolve(generator()),
};
}

type MockObj = ReturnType<typeof createMockObj>;

// ------------- server -------------------
const opts = {
nonce: () => "__tson" + i, // add index to nonce to make sure it's not cached
types: [tsonPromise, tsonAsyncIterable, tsonBigint],
} satisfies TsonAsyncOptions;

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);

const obj = createMockObj();
const response = tson.toSSEResponse(obj);

for (const [key, value] of response.headers) {
res.setHeader(key, value);
}

for await (const value of response.body as any) {
res.write(value);
if (kill) {
// interrupt the stream
res.end();
kill = false;
return;
}
}

res.end();
},
});

// ------------- client -------------------
const tson = createTsonAsync(opts);

// e2e
const ac = new AbortController();
const onReconnect = vi.fn();
const shape = await tson.createEventSource<MockObj>(server.url, {
onReconnect,
reconnect: true,
signal: ac.signal,
});

const messages: bigint[] = [];

for await (const value of await shape.iterable) {
messages.push(value);
}

expect(messages).toMatchInlineSnapshot(`
[
0n,
1n,
2n,
3n,
4n,
6n,
7n,
8n,
9n,
11n,
]
`);

expect(onReconnect).toHaveBeenCalledTimes(2);
});
5 changes: 4 additions & 1 deletion vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ export default defineConfig({
reporter: ["html", "lcov"],
},
exclude: ["lib", "node_modules", "examples", "benchmark"],
setupFiles: ["console-fail-test/setup"],
setupFiles: [
// this is useful to comment out sometimes
"console-fail-test/setup",
],
},
});

0 comments on commit aa2cbad

Please sign in to comment.