From 3bd97824d903ce334a0ba448e9d9b41bad921241 Mon Sep 17 00:00:00 2001 From: Alex / KATT Date: Fri, 13 Oct 2023 17:30:55 +0200 Subject: [PATCH] feat: split up deserialization and start on support for SSE (#67) --- README.md | 2 +- examples/async/package.json | 2 +- examples/async/src/server.ts | 4 +- package.json | 2 + pnpm-lock.yaml | 14 ++ src/async/asyncTypes.ts | 4 + src/async/createTsonAsync.ts | 14 +- src/async/deserializeAsync.test.ts | 56 +++--- src/async/deserializeAsync.ts | 245 +++++++++++++++---------- src/async/handlers/tsonPromise.test.ts | 29 ++- src/async/iterableUtils.ts | 16 ++ src/async/serializeAsync.test.ts | 4 +- src/async/serializeAsync.ts | 40 +++- src/async/sse.test.ts | 86 +++++++++ src/extend/openai.test.ts | 7 +- src/index.test.ts | 9 +- src/index.ts | 4 +- src/internals/testUtils.ts | 1 - 18 files changed, 383 insertions(+), 156 deletions(-) create mode 100644 src/async/sse.test.ts diff --git a/README.md b/README.md index 70dc1430..c1d19b6a 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Serialize almost[^1] anything! - 🌊 Serialize & stream things like `Promise`s or async iterators > [!IMPORTANT] -> _Though well-tested, this package might undergo big changes, stay tuned!_ +> _Though well-tested, this package might undergo big changes and **does not** follow semver whilst on `0.x.y`-version, stay tuned!_ ### 👀 Example diff --git a/examples/async/package.json b/examples/async/package.json index 9d6ea04e..4b6914ed 100644 --- a/examples/async/package.json +++ b/examples/async/package.json @@ -1,5 +1,5 @@ { - "name": "@examples/minimal", + "name": "@examples/json-stream", "version": "10.38.5", "private": true, "description": "An example project for tupleson", diff --git a/examples/async/src/server.ts b/examples/async/src/server.ts index fd1fe70e..c40bb773 100644 --- a/examples/async/src/server.ts +++ b/examples/async/src/server.ts @@ -1,9 +1,9 @@ import http from "node:http"; -import { createTsonStringifyAsync } from "tupleson"; +import { createTsonStreamAsync } from "tupleson"; import { tsonOptions } from "./shared.js"; -const tsonStringifyAsync = createTsonStringifyAsync(tsonOptions); +const tsonStringifyAsync = createTsonStreamAsync(tsonOptions); const randomNumber = (min: number, max: number) => Math.floor(Math.random() * (max - min + 1) + min); diff --git a/package.json b/package.json index 580b61bb..ae8f1e95 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "@release-it/conventional-changelog": "^7.0.2", "@tsconfig/strictest": "^2.0.2", "@types/eslint": "^8.44.3", + "@types/event-source-polyfill": "^1.0.2", "@typescript-eslint/eslint-plugin": "^6.7.3", "@typescript-eslint/parser": "^6.7.3", "@vitest/coverage-v8": "^0.34.6", @@ -61,6 +62,7 @@ "eslint-plugin-regexp": "^1.15.0", "eslint-plugin-vitest": "^0.3.1", "eslint-plugin-yml": "^1.9.0", + "event-source-polyfill": "^1.0.31", "jsonc-eslint-parser": "^2.3.0", "knip": "^2.31.0", "markdownlint": "^0.31.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 213768c3..b049df88 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -23,6 +23,9 @@ importers: '@types/eslint': specifier: ^8.44.3 version: 8.44.3 + '@types/event-source-polyfill': + specifier: ^1.0.2 + version: 1.0.2 '@typescript-eslint/eslint-plugin': specifier: ^6.7.3 version: 6.7.3(@typescript-eslint/parser@6.7.3)(eslint@8.50.0)(typescript@5.2.2) @@ -77,6 +80,9 @@ importers: eslint-plugin-yml: specifier: ^1.9.0 version: 1.9.0(eslint@8.50.0) + event-source-polyfill: + specifier: ^1.0.31 + version: 1.0.31 jsonc-eslint-parser: specifier: ^2.3.0 version: 2.3.0 @@ -1214,6 +1220,10 @@ packages: resolution: {integrity: sha512-VeiPZ9MMwXjO32/Xu7+OwflfmeoRwkE/qzndw42gGtgJwZopBnzy2gD//NN1+go1mADzkDcqf/KnFRSjTJ8xJA==} dev: true + /@types/event-source-polyfill@1.0.2: + resolution: {integrity: sha512-qE5zrFd73BRs5oSjVys6g/5GboqOMbzLRTUFPAhfULvvvbRAOXw9m4Wk+p1BtoZm4JgW7TljGGfVabBqvi3eig==} + dev: true + /@types/http-cache-semantics@4.0.2: resolution: {integrity: sha512-FD+nQWA2zJjh4L9+pFXqWOi0Hs1ryBCfI+985NjluQ1p8EYtoLvjLOKidXBtZ4/IcxDX4o8/E8qDS3540tNliw==} dev: true @@ -3117,6 +3127,10 @@ packages: engines: {node: '>=0.10.0'} dev: true + /event-source-polyfill@1.0.31: + resolution: {integrity: sha512-4IJSItgS/41IxN5UVAVuAyczwZF7ZIEsM1XAoUzIHA6A+xzusEZUutdXz2Nr+MQPLxfTiCvqE79/C8HT8fKFvA==} + dev: true + /event-target-shim@5.0.1: resolution: {integrity: sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==} engines: {node: '>=6'} diff --git a/src/async/asyncTypes.ts b/src/async/asyncTypes.ts index 5ed6bb2a..830e0931 100644 --- a/src/async/asyncTypes.ts +++ b/src/async/asyncTypes.ts @@ -10,6 +10,10 @@ export type TsonAsyncStringifierIterable = AsyncIterable & { [serialized]: TValue; }; +export type BrandSerialized = TType & { + [serialized]: TValue; +}; + export type TsonAsyncStringifier = ( value: TValue, space?: number, diff --git a/src/async/createTsonAsync.ts b/src/async/createTsonAsync.ts index 6bbd33ad..7009ccac 100644 --- a/src/async/createTsonAsync.ts +++ b/src/async/createTsonAsync.ts @@ -1,8 +1,16 @@ import { TsonAsyncOptions } from "./asyncTypes.js"; import { createTsonParseAsync } from "./deserializeAsync.js"; -import { createTsonStringifyAsync } from "./serializeAsync.js"; +import { + createTsonSSEResponse, + createTsonStreamAsync, +} from "./serializeAsync.js"; +/** + * Only used for testing - when using the async you gotta pick which one you want + * @internal + */ export const createTsonAsync = (opts: TsonAsyncOptions) => ({ - parse: createTsonParseAsync(opts), - stringify: createTsonStringifyAsync(opts), + parseJsonStream: createTsonParseAsync(opts), + stringifyJsonStream: createTsonStreamAsync(opts), + toSSEResponse: createTsonSSEResponse(opts), }); diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 166a78ea..28581aa6 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -4,7 +4,6 @@ import { TsonAsyncOptions, TsonParseAsyncOptions, TsonType, - createTsonAsync, createTsonParseAsync, tsonAsyncIterable, tsonBigint, @@ -19,6 +18,7 @@ import { waitFor, } from "../internals/testUtils.js"; import { TsonSerialized } from "../sync/syncTypes.js"; +import { createTsonAsync } from "./createTsonAsync.js"; import { mapIterable, readableStreamToAsyncIterable } from "./iterableUtils.js"; test("deserialize variable chunk length", async () => { @@ -32,7 +32,7 @@ test("deserialize variable chunk length", async () => { yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}'; yield "\n,\n[\n]\n]"; })(); - const result = await tson.parse(iterable); + const result = await tson.parseJsonStream(iterable); expect(result).toEqual({ foo: "bar" }); } @@ -41,7 +41,7 @@ test("deserialize variable chunk length", async () => { await sleep(1); yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]'; })(); - const result = await tson.parse(iterable); + const result = await tson.parseJsonStream(iterable); expect(result).toEqual({ foo: "bar" }); } @@ -54,7 +54,7 @@ test("deserialize variable chunk length", async () => { yield "[\n]\n"; yield "]"; })(); - const result = await tson.parse(iterable); + const result = await tson.parseJsonStream(iterable); expect(result).toEqual({ foo: "bar" }); } }); @@ -71,9 +71,9 @@ test("deserialize async iterable", async () => { foo: "bar", }; - const strIterable = tson.stringify(obj); + const strIterable = tson.stringifyJsonStream(obj); - const result = await tson.parse(strIterable); + const result = await tson.parseJsonStream(strIterable); expect(result).toEqual(obj); } @@ -84,9 +84,9 @@ test("deserialize async iterable", async () => { foo: Promise.resolve("bar"), }; - const strIterable = tson.stringify(obj); + const strIterable = tson.stringifyJsonStream(obj); - const result = await tson.parse(strIterable); + const result = await tson.parseJsonStream(strIterable); expect(await result.foo).toEqual("bar"); } @@ -120,9 +120,9 @@ test("stringify async iterable + promise", async () => { promise: Promise.resolve(42), }; - const strIterable = tson.stringify(input); + const strIterable = tson.stringifyJsonStream(input); - const output = await tson.parse(strIterable, parseOptions); + const output = await tson.parseJsonStream(strIterable, parseOptions); expect(output.foo).toEqual("bar"); @@ -166,7 +166,7 @@ test("e2e: stringify async iterable and promise over the network", async () => { const tson = createTsonAsync(opts); const obj = createMockObj(); - const strIterarable = tson.stringify(obj, 4); + const strIterarable = tson.stringifyJsonStream(obj, 4); for await (const value of strIterarable) { res.write(value); @@ -192,7 +192,7 @@ test("e2e: stringify async iterable and promise over the network", async () => { (v) => textDecoder.decode(v), ); - const parsed = await tson.parse(stringIterator); + const parsed = await tson.parseJsonStream(stringIterator); expect(parsed.foo).toEqual("bar"); @@ -267,7 +267,7 @@ test("iterator error", async () => { const tson = createTsonAsync(opts); const obj = createMockObj(); - const strIterarable = tson.stringify(obj, 4); + const strIterarable = tson.stringifyJsonStream(obj, 4); for await (const value of strIterarable) { res.write(value); @@ -291,7 +291,7 @@ test("iterator error", async () => { (v) => textDecoder.decode(v), ); - const parsed = await tson.parse(stringIterator); + const parsed = await tson.parseJsonStream(stringIterator); expect(await parsed.promise).toEqual(42); const results = []; @@ -381,7 +381,7 @@ test("values missing when stream ends", async () => { assert(err); expect(err.message).toMatchInlineSnapshot( - '"Stream interrupted: Stream ended unexpectedly"', + '"Stream interrupted: Stream ended unexpectedly (state 1)"', ); } @@ -390,7 +390,7 @@ test("values missing when stream ends", async () => { const err = await waitError(result.promise); expect(err).toMatchInlineSnapshot( - "[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]", + "[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)]", ); } @@ -398,7 +398,7 @@ test("values missing when stream ends", async () => { expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(` [ [ - [TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly], + [TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)], ], ] `); @@ -432,14 +432,14 @@ test("async: missing values of promise", async () => { await createTsonAsync({ types: [tsonPromise], - }).parse(generator(), parseOptions); + }).parseJsonStream(generator(), parseOptions); await waitFor(() => { expect(parseOptions.onStreamError).toHaveBeenCalledTimes(1); }); expect(parseOptions.onStreamError.mock.calls[0]![0]!).toMatchInlineSnapshot( - "[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly]", + "[TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)]", ); }); @@ -523,7 +523,7 @@ test("1 iterator completed but another never finishes", async () => { `); expect(err.message).toMatchInlineSnapshot( - '"Stream interrupted: Stream ended unexpectedly"', + '"Stream interrupted: Stream ended unexpectedly (state 1)"', ); } @@ -532,7 +532,7 @@ test("1 iterator completed but another never finishes", async () => { expect(parseOptions.onStreamError.mock.calls).toMatchInlineSnapshot(` [ [ - [TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly], + [TsonStreamInterruptedError: Stream interrupted: Stream ended unexpectedly (state 1)], ], ] `); @@ -578,7 +578,7 @@ test("e2e: simulated server crash", async () => { const tson = createTsonAsync(opts); const obj = createMockObj(); - const strIterarable = tson.stringify(obj, 4); + const strIterarable = tson.stringifyJsonStream(obj, 4); void crashedDeferred.promise.then(() => { // destroy the response stream @@ -607,7 +607,10 @@ test("e2e: simulated server crash", async () => { (v) => textDecoder.decode(v), ); - const parsed = await tson.parse(stringIterator, parseOptions); + const parsed = await tson.parseJsonStream( + stringIterator, + parseOptions, + ); { // check the iterator const results = []; @@ -678,7 +681,7 @@ test("e2e: client aborted request", async () => { const tson = createTsonAsync(opts); const obj = createMockObj(); - const strIterarable = tson.stringify(obj, 4); + const strIterarable = tson.stringifyJsonStream(obj, 4); for await (const value of strIterarable) { serverSentChunks.push(value.trimEnd()); @@ -707,7 +710,10 @@ test("e2e: client aborted request", async () => { (v) => textDecoder.decode(v), ); - const parsed = await tson.parse(stringIterator, parseOptions); + const parsed = await tson.parseJsonStream( + stringIterator, + parseOptions, + ); { // check the iterator const results = []; diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 251dee80..08627dbf 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -1,4 +1,4 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ +/* eslint-disable @typescript-eslint/no-non-null-assertion */ import { TsonError } from "../errors.js"; import { assert } from "../internals/assert.js"; @@ -16,6 +16,7 @@ import { TsonAsyncStringifierIterable, TsonAsyncType, } from "./asyncTypes.js"; +import { createReadableStream } from "./iterableUtils.js"; import { TsonAsyncValueTuple } from "./serializeAsync.js"; type WalkFn = (value: unknown) => unknown; @@ -37,7 +38,10 @@ type TsonParseAsync = ( opts?: TsonParseAsyncOptions, ) => Promise; -export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { +type TsonDeserializeIterable = AsyncIterable< + TsonAsyncValueTuple | TsonSerialized +>; +function createTsonDeserializer(opts: TsonAsyncOptions) { const typeByKey: Record = {}; for (const handler of opts.types) { @@ -52,10 +56,9 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { } return async ( - iterable: AsyncIterable, + iterable: TsonDeserializeIterable, parseOptions: TsonParseAsyncOptions, ) => { - // this is an awful hack to get around making a some sort of pipeline const cache = new Map< TsonAsyncIndex, ReadableStreamDefaultController @@ -77,13 +80,8 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const idx = serializedValue as TsonAsyncIndex; - let controller: ReadableStreamDefaultController = - null as unknown as ReadableStreamDefaultController; - const readable = new ReadableStream({ - start(c) { - controller = c; - }, - }); + const [readable, controller] = createReadableStream(); + // the `start` method is called "immediately when the object is constructed" // [MDN](http://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream) // so we're guaranteed that the controller is set in the cache @@ -106,36 +104,15 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { return walk; }; - async function getStreamedValues( - lines: string[], - accumulator: string, - walk: WalkFn, - ) { - // - let streamEnded = false; - // - - function readLine(str: string) { - // console.log("got str", str); - str = str.trimStart(); - - if (str.startsWith(",")) { - // ignore leading comma - str = str.slice(1); - } - - if (str === "" || str === "[" || str === ",") { - // beginning of values array or empty string - return; - } - - if (str === "]]") { - // end of values and stream - streamEnded = true; - return; + async function getStreamedValues(walk: WalkFn) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const nextValue = await iterator.next(); + if (nextValue.done) { + break; } - const [index, result] = JSON.parse(str) as TsonAsyncValueTuple; + const [index, result] = nextValue.value as TsonAsyncValueTuple; const controller = cache.get(index); @@ -146,68 +123,25 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { // resolving deferred controller.enqueue(walkedResult); } - - do { - lines.forEach(readLine); - lines.length = 0; - - const nextValue = await iterator.next(); - if (!nextValue.done) { - accumulator += nextValue.value; - const parts = accumulator.split("\n"); - accumulator = parts.pop() ?? ""; - lines.push(...parts); - } else if (accumulator) { - readLine(accumulator); - } - } while (lines.length); - - assert(streamEnded, "Stream ended unexpectedly"); } async function init() { - let accumulator = ""; - // get the head of the JSON + const nextValue = await iterator.next(); + if (nextValue.done) { + throw new TsonError("Unexpected end of stream before head"); + } - const lines: string[] = []; - do { - const nextValue = await iterator.next(); - if (nextValue.done) { - throw new TsonError("Unexpected end of stream before head"); - } - - accumulator += nextValue.value; - - const parts = accumulator.split("\n"); - accumulator = parts.pop() ?? ""; - lines.push(...parts); - } while (lines.length < 2); - - const [ - /** - * First line is just a `[` - */ - _firstLine, - /** - * Second line is the shape of the JSON - */ - headLine, - // .. third line is a `,` - // .. fourth line is the start of the values array - ...buffer - ] = lines; - - assert(headLine, "No head line found"); - - const head = JSON.parse(headLine) as TsonSerialized; + const head = nextValue.value as TsonSerialized; const walk = walker(head.nonce); try { - return walk(head.json); + const walked = walk(head.json); + + return walked; } finally { - getStreamedValues(buffer, accumulator, walk).catch((cause) => { + getStreamedValues(walk).catch((cause) => { // Something went wrong while getting the streamed values const err = new TsonStreamInterruptedError(cause); @@ -222,19 +156,138 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { } } - const result = await init().catch((cause: unknown) => { + return await init().catch((cause: unknown) => { throw new TsonError("Failed to initialize TSON stream", { cause }); }); - return [result, cache] as const; }; } +function lineAccumulator() { + let accumulator = ""; + const lines: string[] = []; + + return { + lines, + push(chunk: string) { + accumulator += chunk; + + const parts = accumulator.split("\n"); + accumulator = parts.pop() ?? ""; + lines.push(...parts); + }, + }; +} + +async function* stringIterableToTsonIterable( + iterable: AsyncIterable, +): TsonDeserializeIterable { + // get the head of the JSON + const acc = lineAccumulator(); + + // state of stream + const AWAITING_HEAD = 0; + const STREAMING_VALUES = 1; + const ENDED = 2; + + let state: typeof AWAITING_HEAD | typeof ENDED | typeof STREAMING_VALUES = + AWAITING_HEAD; + + // iterate values & yield them + + for await (const str of iterable) { + acc.push(str); + + if (state === AWAITING_HEAD && acc.lines.length >= 2) { + /** + * First line is just a `[` + */ + acc.lines.shift(); + + // Second line is the head + const headLine = acc.lines.shift(); + + assert(headLine, "No head line found"); + + const head = JSON.parse(headLine) as TsonSerialized; + + yield head; + + state = STREAMING_VALUES; + } + + if (state === STREAMING_VALUES) { + while (acc.lines.length) { + let str = acc.lines.shift()!; + + // console.log("got str", str); + str = str.trimStart(); + + if (str.startsWith(",")) { + // ignore leading comma + str = str.slice(1); + } + + if (str === "" || str === "[" || str === ",") { + // beginning of values array or empty string + continue; + } + + if (str === "]]") { + // end of values and stream + state = ENDED; + continue; + } + + yield JSON.parse(str) as TsonAsyncValueTuple; + } + } + } + + assert(state === ENDED, `Stream ended unexpectedly (state ${state})`); +} + export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync { - const instance = createTsonParseAsyncInner(opts); + const instance = createTsonDeserializer(opts); return (async (iterable, opts) => { - const [result] = await instance(iterable, opts ?? {}); + const tsonIterable = stringIterableToTsonIterable(iterable); - return result; + return await instance(tsonIterable, opts ?? {}); }) as TsonParseAsync; } + +// export function createTsonParseEventSource(opts: TsonAsyncOptions) { +// const instance = createTsonDeserializer(opts); + +// return async ( +// url: string, +// parseOpts?: TsonParseAsyncOptions & { +// abortSignal: AbortSignal; +// }, +// ) => { +// const [stream, controller] = createReadableStream(); +// const eventSource = new EventSource(url); + +// const onAbort = () => { +// eventSource.close(); +// controller.close(); +// parseOpts?.abortSignal.removeEventListener("abort", onAbort); +// }; + +// parseOpts?.abortSignal.addEventListener("abort", onAbort); + +// eventSource.onmessage = (msg) => { +// // eslint-disable-next-line @typescript-eslint/no-unsafe-argument +// controller.enqueue(msg.data); +// }; + +// const iterable = mapIterable( +// readableStreamToAsyncIterable(stream), +// (msg) => { +// return JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized; +// }, +// ); + +// return (await instance(iterable, parseOpts ?? {})) as TValue; +// }; +// } diff --git a/src/async/handlers/tsonPromise.test.ts b/src/async/handlers/tsonPromise.test.ts index 06d18a35..fa4de76b 100644 --- a/src/async/handlers/tsonPromise.test.ts +++ b/src/async/handlers/tsonPromise.test.ts @@ -4,9 +4,8 @@ import { TsonAsyncOptions, TsonType, createAsyncTsonSerialize, - createTsonAsync, createTsonParseAsync, - createTsonStringifyAsync, + createTsonStreamAsync, tsonPromise, } from "../../index.js"; import { @@ -15,7 +14,7 @@ import { waitError, } from "../../internals/testUtils.js"; import { createPromise } from "../../internals/testUtils.js"; -import { createTsonParseAsyncInner } from "../deserializeAsync.js"; +import { createTsonAsync } from "../createTsonAsync.js"; import { mapIterable, readableStreamToAsyncIterable, @@ -194,7 +193,7 @@ test("stringifier - no promises", async () => { const buffer: string[] = []; - for await (const value of tson.stringify(obj, 4)) { + for await (const value of tson.stringifyJsonStream(obj, 4)) { buffer.push(value.trimEnd()); } @@ -231,7 +230,7 @@ test("stringifier - with promise", async () => { const buffer: string[] = []; - for await (const value of tson.stringify(obj, 4)) { + for await (const value of tson.stringifyJsonStream(obj, 4)) { buffer.push(value.trimEnd()); } @@ -265,7 +264,7 @@ test("stringifier - promise in promise", async () => { const buffer: string[] = []; - for await (const value of tson.stringify(obj, 2)) { + for await (const value of tson.stringifyJsonStream(obj, 2)) { buffer.push(value.trimEnd()); } @@ -333,9 +332,9 @@ test("pipe stringifier to parser", async () => { types: [tsonPromise], }); - const strIterarable = tson.stringify(obj, 4); + const strIterarable = tson.stringifyJsonStream(obj, 4); - const value = await tson.parse(strIterarable); + const value = await tson.parseJsonStream(strIterarable); expect(value).toHaveProperty("foo"); expect(await value.foo).toBe("bar"); @@ -356,9 +355,9 @@ test("stringify and parse promise with a promise", async () => { types: [tsonPromise], }); - const strIterarable = tson.stringify(obj, 4); + const strIterarable = tson.stringifyJsonStream(obj, 4); - const value = await tson.parse(strIterarable); + const value = await tson.parseJsonStream(strIterarable); const firstPromise = await value.promise; @@ -397,7 +396,7 @@ test("stringify and parse promise with a promise over a network connection", asy }; }, 3), }; - const strIterarable = tson.stringify(obj, 4); + const strIterarable = tson.stringifyJsonStream(obj, 4); for await (const value of strIterarable) { res.write(value); @@ -423,7 +422,7 @@ test("stringify and parse promise with a promise over a network connection", asy (v) => textDecoder.decode(v), ); - const value = await tson.parse(stringIterator); + const value = await tson.parseJsonStream(stringIterator); const asObj = value as Obj; const firstPromise = await asObj.promise; @@ -451,9 +450,9 @@ test("does not crash node when it receives a promise rejection", async () => { nonce: () => "__tson", types: [tsonPromise], }; - const stringify = createTsonStringifyAsync(opts); + const stringify = createTsonStreamAsync(opts); - const parse = createTsonParseAsyncInner(opts); + const parse = createTsonParseAsync(opts); const original = { foo: createPromise(() => { @@ -472,7 +471,7 @@ test("stringify promise rejection", async () => { nonce: () => "__tson", types: [tsonPromise, tsonError], }; - const stringify = createTsonStringifyAsync(opts); + const stringify = createTsonStreamAsync(opts); const parse = createTsonParseAsync(opts); diff --git a/src/async/iterableUtils.ts b/src/async/iterableUtils.ts index 27850c95..a820ad66 100644 --- a/src/async/iterableUtils.ts +++ b/src/async/iterableUtils.ts @@ -1,3 +1,5 @@ +import { assert } from "../internals/assert.js"; + export async function* readableStreamToAsyncIterable( stream: ReadableStream, ): AsyncIterable { @@ -31,3 +33,17 @@ export async function* mapIterable( yield fn(value); } } + +export function createReadableStream() { + let controller: ReadableStreamDefaultController = + null as unknown as ReadableStreamDefaultController; + const stream = new ReadableStream({ + start(c) { + controller = c; + }, + }); + + assert(controller, `Could not find controller - this is a bug`); + + return [stream, controller] as const; +} diff --git a/src/async/serializeAsync.test.ts b/src/async/serializeAsync.test.ts index fff5c652..7f24b02d 100644 --- a/src/async/serializeAsync.test.ts +++ b/src/async/serializeAsync.test.ts @@ -4,7 +4,7 @@ import { tsonAsyncIterable, tsonBigint, tsonPromise } from "../index.js"; import { sleep } from "../internals/testUtils.js"; import { createAsyncTsonSerialize, - createTsonStringifyAsync, + createTsonStreamAsync, } from "./serializeAsync.js"; test("serialize promise", async () => { @@ -158,7 +158,7 @@ test("serialize async iterable", async () => { }); test("stringify async iterable + promise", async () => { - const stringify = createTsonStringifyAsync({ + const stringify = createTsonStreamAsync({ nonce: () => "__tson", types: [tsonAsyncIterable, tsonPromise, tsonBigint], }); diff --git a/src/async/serializeAsync.ts b/src/async/serializeAsync.ts index 3f8e6dd8..df26b401 100644 --- a/src/async/serializeAsync.ts +++ b/src/async/serializeAsync.ts @@ -12,11 +12,14 @@ import { TsonTypeTesterCustom, TsonTypeTesterPrimitive, } from "../sync/syncTypes.js"; +import { TsonStreamInterruptedError } from "./asyncErrors.js"; import { + BrandSerialized, TsonAsyncIndex, TsonAsyncOptions, TsonAsyncStringifier, } from "./asyncTypes.js"; +import { createReadableStream } from "./iterableUtils.js"; type WalkFn = (value: unknown) => unknown; @@ -207,7 +210,7 @@ export function createAsyncTsonSerialize( }; } -export function createTsonStringifyAsync( +export function createTsonStreamAsync( opts: TsonAsyncOptions, ): TsonAsyncStringifier { const indent = (length: number) => " ".repeat(length); @@ -246,3 +249,38 @@ export function createTsonStringifyAsync( return stringifier as TsonAsyncStringifier; } + +export function createTsonSSEResponse(opts: TsonAsyncOptions) { + const serialize = createAsyncTsonSerialize(opts); + + return (value: TValue) => { + const [readable, controller] = createReadableStream(); + + async function iterate() { + const [head, iterable] = serialize(value); + + controller.enqueue(`data: ${JSON.stringify(head)}\n\n`); + for await (const chunk of iterable) { + controller.enqueue(`data: ${JSON.stringify(chunk)}\n\n`); + } + + controller.error( + new TsonStreamInterruptedError(new Error("SSE stream ended")), + ); + } + + iterate().catch((err) => { + controller.error(err); + }); + + const res = new Response(readable, { + headers: { + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }, + status: 200, + }); + return res as BrandSerialized; + }; +} diff --git a/src/async/sse.test.ts b/src/async/sse.test.ts new file mode 100644 index 00000000..b21336f4 --- /dev/null +++ b/src/async/sse.test.ts @@ -0,0 +1,86 @@ +/* eslint-disable @typescript-eslint/no-unnecessary-condition */ +import { EventSourcePolyfill, NativeEventSource } from "event-source-polyfill"; +import { expect, test } from "vitest"; +(global as any).EventSource = NativeEventSource || EventSourcePolyfill; + +import { TsonAsyncOptions, tsonAsyncIterable, tsonPromise } from "../index.js"; +import { createTestServer, sleep } from "../internals/testUtils.js"; +import { createTsonAsync } from "./createTsonAsync.js"; + +test("SSE response test", async () => { + function createMockObj() { + async function* generator() { + let i = 0; + while (true) { + yield i++; + await sleep(100); + } + } + + return { + foo: "bar", + iterable: generator(), + promise: Promise.resolve(42), + rejectedPromise: Promise.reject(new Error("rejected promise")), + }; + } + + // type MockObj = ReturnType; + + // ------------- server ------------------- + const opts = { + nonce: () => "__tson", + types: [tsonPromise, tsonAsyncIterable], + } satisfies TsonAsyncOptions; + + const server = await createTestServer({ + handleRequest: async (_req, res) => { + const tson = createTsonAsync(opts); + + const obj = createMockObj(); + const response = tson.toSSEResponse(obj); + + for (const [key, value] of response.headers) { + res.setHeader(key, value); + } + + for await (const value of response.body as any) { + res.write(value); + } + + res.end(); + }, + }); + + // ------------- client ------------------- + // const tson = createTsonAsync(opts); + + // do a streamed fetch request + const sse = new EventSource(server.url); + + const messages: MessageEvent["data"][] = []; + await new Promise((resolve) => { + sse.onmessage = (msg) => { + // console.log(sse.readyState); + // console.log({ msg }); + messages.push(msg.data); + + if (messages.length === 5) { + sse.close(); + resolve(); + } + }; + }); + + expect(messages).toMatchInlineSnapshot(` + [ + "{\\"json\\":{\\"foo\\":\\"bar\\",\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"],\\"promise\\":[\\"Promise\\",1,\\"__tson\\"],\\"rejectedPromise\\":[\\"Promise\\",2,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}", + "[0,[0,0]]", + "[1,[0,42]]", + "[2,[1,{}]]", + "[0,[0,1]]", + ] + `); +}); + +test.todo("parse SSE response"); diff --git a/src/extend/openai.test.ts b/src/extend/openai.test.ts index 6ba0d466..3566d107 100644 --- a/src/extend/openai.test.ts +++ b/src/extend/openai.test.ts @@ -1,7 +1,8 @@ import OpenAI from "openai"; import { expect, test } from "vitest"; -import { createTsonAsync, tsonAsyncIterable, tsonPromise } from "../index.js"; +import { createTsonAsync } from "../async/createTsonAsync.js"; +import { tsonAsyncIterable, tsonPromise } from "../index.js"; import { assert } from "../internals/assert.js"; const apiKey = process.env["OPENAI_API_KEY"]; @@ -18,7 +19,7 @@ test.skipIf(!apiKey)("openai", async () => { types: [tsonAsyncIterable, tsonPromise], }); - const stringified = tson.stringify({ + const stringified = tson.stringifyJsonStream({ stream: await openai.chat.completions.create({ messages: [{ content: "Say this is a test", role: "user" }], model: "gpt-4", @@ -26,7 +27,7 @@ test.skipIf(!apiKey)("openai", async () => { }), }); - const parsed = await tson.parse(stringified); + const parsed = await tson.parseJsonStream(stringified); let buffer = ""; for await (const out of parsed.stream) { diff --git a/src/index.test.ts b/src/index.test.ts index f17c20d4..5cacd798 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -1,6 +1,7 @@ import { expect, test } from "vitest"; -import { TsonOptions, TsonType, createTson, createTsonAsync } from "./index.js"; +import { createTsonAsync } from "./async/createTsonAsync.js"; +import { TsonOptions, TsonType, createTson } from "./index.js"; import { expectError, waitError } from "./internals/testUtils.js"; test("multiple handlers for primitive string found", () => { @@ -88,7 +89,7 @@ test("async: duplicate keys", async () => { const gen = generator(); await createTsonAsync({ types: [stringHandler, stringHandler], - }).parse(gen); + }).parseJsonStream(gen); }); expect(err).toMatchInlineSnapshot( @@ -104,7 +105,7 @@ test("async: multiple handlers for primitive string found", async () => { const err = await waitError(async () => { const iterator = createTsonAsync({ types: [stringHandler, stringHandler], - }).stringify({}); + }).stringifyJsonStream({}); // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _ of iterator) { @@ -129,7 +130,7 @@ test("async: bad init", async () => { const gen = generator(); await createTsonAsync({ types: [], - }).parse(gen); + }).parseJsonStream(gen); }); expect(err).toMatchInlineSnapshot( diff --git a/src/index.ts b/src/index.ts index 42cc0781..d3cd0d64 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,14 +21,14 @@ export * from "./sync/handlers/tsonSymbol.js"; // --- async -- export type { TsonAsyncOptions } from "./async/asyncTypes.js"; -export { createTsonAsync } from "./async/createTsonAsync.js"; export { type TsonParseAsyncOptions, createTsonParseAsync, } from "./async/deserializeAsync.js"; export { createAsyncTsonSerialize, - createTsonStringifyAsync, + createTsonSSEResponse, + createTsonStreamAsync, } from "./async/serializeAsync.js"; export * from "./async/asyncErrors.js"; diff --git a/src/internals/testUtils.ts b/src/internals/testUtils.ts index 2fa6c2ea..f8e07362 100644 --- a/src/internals/testUtils.ts +++ b/src/internals/testUtils.ts @@ -51,7 +51,6 @@ export async function createTestServer(opts: { const server = http.createServer((req, res) => { Promise.resolve(opts.handleRequest(req, res)).catch((err) => { console.error(err); - res.statusCode = 500; res.end(); }); });