Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
sse wip
Browse files Browse the repository at this point in the history
  • Loading branch information
KATT committed Oct 13, 2023
1 parent c51856c commit 513737b
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 58 deletions.
6 changes: 5 additions & 1 deletion src/async/createTsonAsync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { TsonAsyncOptions } from "./asyncTypes.js";
import { createTsonParseAsync } from "./deserializeAsync.js";
import {
createTsonParseAsync,
createTsonParseEventSource,
} from "./deserializeAsync.js";
import {
createTsonSSEResponse,
createTsonStreamAsync,
Expand All @@ -10,6 +13,7 @@ import {
* @internal
*/
export const createTsonAsync = (opts: TsonAsyncOptions) => ({
createEventSourceParser: createTsonParseEventSource(opts),
parseJsonStream: createTsonParseAsync(opts),
stringifyJsonStream: createTsonStreamAsync(opts),
toSSEResponse: createTsonSSEResponse(opts),
Expand Down
79 changes: 43 additions & 36 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ import {
TsonAsyncStringifierIterable,
TsonAsyncType,
} from "./asyncTypes.js";
import { createReadableStream } from "./iterableUtils.js";
import {
createReadableStream,
mapIterable,
readableStreamToAsyncIterable,
} from "./iterableUtils.js";
import { TsonAsyncValueTuple } from "./serializeAsync.js";

type WalkFn = (value: unknown) => unknown;
Expand Down Expand Up @@ -256,38 +260,41 @@ export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync {
}) as TsonParseAsync;
}

// export function createTsonParseEventSource(opts: TsonAsyncOptions) {
// const instance = createTsonDeserializer(opts);

// return async <TValue = unknown>(
// url: string,
// parseOpts?: TsonParseAsyncOptions & {
// abortSignal: AbortSignal;
// },
// ) => {
// const [stream, controller] = createReadableStream<string>();
// const eventSource = new EventSource(url);

// const onAbort = () => {
// eventSource.close();
// controller.close();
// parseOpts?.abortSignal.removeEventListener("abort", onAbort);
// };

// parseOpts?.abortSignal.addEventListener("abort", onAbort);

// eventSource.onmessage = (msg) => {
// // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
// controller.enqueue(msg.data);
// };

// const iterable = mapIterable(
// readableStreamToAsyncIterable(stream),
// (msg) => {
// return JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized;
// },
// );

// return (await instance(iterable, parseOpts ?? {})) as TValue;
// };
// }
export function createTsonParseEventSource(opts: TsonAsyncOptions) {
const instance = createTsonDeserializer(opts);

return async <TValue = unknown>(
url: string,
parseOpts: TsonParseAsyncOptions & {
signal?: AbortSignal;
} = {},
) => {
const [stream, controller] = createReadableStream<string>();
const eventSource = new EventSource(url);

const onAbort = () => {
eventSource.close();
controller.close();
parseOpts.signal?.removeEventListener("abort", onAbort);
};

parseOpts.signal?.addEventListener("abort", onAbort);

eventSource.onmessage = (msg) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
controller.enqueue(msg.data);
};

const iterable = mapIterable(
readableStreamToAsyncIterable(stream),
(msg) => {
const parsed = JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized;

console.log({ parsed });
return parsed;
},
);

return (await instance(iterable, parseOpts)) as TValue;
};
}
62 changes: 41 additions & 21 deletions src/async/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ test("SSE response test", async () => {
};
}

// type MockObj = ReturnType<typeof createMockObj>;
type MockObj = ReturnType<typeof createMockObj>;

// ------------- server -------------------
const opts = {
Expand Down Expand Up @@ -53,26 +53,26 @@ test("SSE response test", async () => {
});

// ------------- client -------------------
// const tson = createTsonAsync(opts);

// do a streamed fetch request
const sse = new EventSource(server.url);

const messages: MessageEvent["data"][] = [];
await new Promise<void>((resolve) => {
sse.onmessage = (msg) => {
// console.log(sse.readyState);
// console.log({ msg });
messages.push(msg.data);

if (messages.length === 5) {
sse.close();
resolve();
}
};
});

expect(messages).toMatchInlineSnapshot(`
const tson = createTsonAsync(opts);

{
const sse = new EventSource(server.url);

const messages: MessageEvent["data"][] = [];
await new Promise<void>((resolve) => {
sse.onmessage = (msg) => {
// console.log(sse.readyState);
// console.log({ msg });
messages.push(msg.data);

if (messages.length === 5) {
sse.close();
resolve();
}
};
});

expect(messages).toMatchInlineSnapshot(`
[
"{\\"json\\":{\\"foo\\":\\"bar\\",\\"iterable\\":[\\"AsyncIterable\\",0,\\"__tson\\"],\\"promise\\":[\\"Promise\\",1,\\"__tson\\"],\\"rejectedPromise\\":[\\"Promise\\",2,\\"__tson\\"]},\\"nonce\\":\\"__tson\\"}",
"[0,[0,0]]",
Expand All @@ -81,6 +81,26 @@ test("SSE response test", async () => {
"[0,[0,1]]",
]
`);
}

{
// e2e
const ac = new AbortController();
const shape = await tson.createEventSourceParser<MockObj>(server.url, {
signal: ac.signal,
});

const messages: number[] = [];

for await (const value of shape.iterable) {
messages.push(value);
if (messages.length === 5) {
ac.abort();
}
}

expect(messages).toMatchInlineSnapshot();
}
});

test.todo("parse SSE response");

0 comments on commit 513737b

Please sign in to comment.