diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts index 3d497eb..c4ff667 100644 --- a/src/async/sse.test.ts +++ b/src/async/sse.test.ts @@ -204,3 +204,95 @@ test("handle reconnects when response is interrupted", async () => { ] `); }); + +test("handle reconnects - iterator wrapped in Promise", async () => { + let i = 0; + + let kill = false; + function createMockObj() { + async function* generator() { + while (true) { + yield BigInt(i); + i++; + await sleep(10); + + if (i === 5) { + kill = true; + } + + if (i > 10) { + // done + return; + } + } + } + + return { + iterable: Promise.resolve(generator()), + }; + } + + type MockObj = ReturnType; + + // ------------- server ------------------- + const opts = { + nonce: () => "__tson" + i, // add index to nonce to make sure it's not cached + types: [tsonPromise, tsonAsyncIterable, tsonBigint], + } satisfies TsonAsyncOptions; + + const server = await createTestServer({ + handleRequest: async (_req, res) => { + const tson = createTsonAsync(opts); + + const obj = createMockObj(); + const response = tson.toSSEResponse(obj); + + for (const [key, value] of response.headers) { + res.setHeader(key, value); + } + + for await (const value of response.body as any) { + res.write(value); + if (kill) { + // interrupt the stream + res.end(); + kill = false; + return; + } + } + + res.end(); + }, + }); + + // ------------- client ------------------- + const tson = createTsonAsync(opts); + + // e2e + const ac = new AbortController(); + const shape = await tson.createEventSource(server.url, { + reconnect: true, + signal: ac.signal, + }); + + const messages: bigint[] = []; + + for await (const value of await shape.iterable) { + messages.push(value); + } + + expect(messages).toMatchInlineSnapshot(` + [ + 0n, + 1n, + 2n, + 3n, + 4n, + 5n, + 7n, + 8n, + 9n, + 10n, + ] + `); +});