Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
add SSE support
Browse files Browse the repository at this point in the history
  • Loading branch information
KATT committed Oct 13, 2023
1 parent 513737b commit e84ef14
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 24 deletions.
7 changes: 7 additions & 0 deletions src/async/asyncErrors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ export class TsonStreamInterruptedError extends TsonError {
this.name = "TsonStreamInterruptedError";
}
}

export class TsonAbortError extends TsonError {
constructor(cause: unknown) {
super("Aborted", { cause });
this.name = "TsonAbortError";
}
}
37 changes: 16 additions & 21 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
TsonSerialized,
TsonTransformerSerializeDeserialize,
} from "../sync/syncTypes.js";
import { TsonStreamInterruptedError } from "./asyncErrors.js";
import { TsonAbortError, TsonStreamInterruptedError } from "./asyncErrors.js";
import {
TsonAsyncIndex,
TsonAsyncOptions,
Expand All @@ -18,7 +18,6 @@ import {
} from "./asyncTypes.js";
import {
createReadableStream,
mapIterable,
readableStreamToAsyncIterable,
} from "./iterableUtils.js";
import { TsonAsyncValueTuple } from "./serializeAsync.js";
Expand All @@ -42,9 +41,8 @@ type TsonParseAsync = <TValue>(
opts?: TsonParseAsyncOptions,
) => Promise<TValue>;

type TsonDeserializeIterable = AsyncIterable<
TsonAsyncValueTuple | TsonSerialized
>;
type TsonDeserializeIterableValue = TsonAsyncValueTuple | TsonSerialized;
type TsonDeserializeIterable = AsyncIterable<TsonDeserializeIterableValue>;
function createTsonDeserializer(opts: TsonAsyncOptions) {
const typeByKey: Record<string, AnyTsonTransformerSerializeDeserialize> = {};

Expand Down Expand Up @@ -116,7 +114,9 @@ function createTsonDeserializer(opts: TsonAsyncOptions) {
break;
}

const [index, result] = nextValue.value as TsonAsyncValueTuple;
const { value } = nextValue;

const [index, result] = value as TsonAsyncValueTuple;

const controller = cache.get(index);

Expand Down Expand Up @@ -269,32 +269,27 @@ export function createTsonParseEventSource(opts: TsonAsyncOptions) {
signal?: AbortSignal;
} = {},
) => {
const [stream, controller] = createReadableStream<string>();
const [stream, controller] =
createReadableStream<TsonDeserializeIterableValue>();
const eventSource = new EventSource(url);

const { signal } = parseOpts;
const onAbort = () => {
assert(signal);
eventSource.close();
controller.close();
parseOpts.signal?.removeEventListener("abort", onAbort);
controller.error(new TsonAbortError("Stream aborted by user"));

signal.removeEventListener("abort", onAbort);
};

parseOpts.signal?.addEventListener("abort", onAbort);
signal?.addEventListener("abort", onAbort);

eventSource.onmessage = (msg) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
controller.enqueue(msg.data);
controller.enqueue(JSON.parse(msg.data));
};

const iterable = mapIterable(
readableStreamToAsyncIterable(stream),
(msg) => {
const parsed = JSON.parse(msg) as TsonAsyncValueTuple | TsonSerialized;

console.log({ parsed });
return parsed;
},
);

const iterable = readableStreamToAsyncIterable(stream);
return (await instance(iterable, parseOpts)) as TValue;
};
}
6 changes: 6 additions & 0 deletions src/async/handlers/tsonAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
TsonAbortError,
TsonPromiseRejectionError,
TsonStreamInterruptedError,
} from "../asyncErrors.js";
Expand Down Expand Up @@ -31,6 +32,11 @@ export const tsonAsyncIterable: TsonAsyncType<
while (((next = await opts.reader.read()), !next.done)) {
const { value } = next;
if (value instanceof TsonStreamInterruptedError) {
if (value.cause instanceof TsonAbortError) {
opts.close();
return;
}

throw value;
}

Expand Down
12 changes: 9 additions & 3 deletions src/async/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ test("SSE response test", async () => {
}
}

expect(messages).toMatchInlineSnapshot();
expect(messages).toMatchInlineSnapshot(`
[
0,
1,
2,
3,
4,
]
`);
}
});

test.todo("parse SSE response");

0 comments on commit e84ef14

Please sign in to comment.