Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

feat: deserializeAsync parse takes a ReadableStream Uint8Array as input #39

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { TsonType } from "../types.js";
import { TsonBranded, TsonTypeTesterCustom } from "../types.js";
import { serialized } from "../types.js";

export type TsonAsyncStringifiedStream<TValue> = ReadableStream<Uint8Array> & {
[serialized]: TValue;
};

export type TsonAsyncStringifierIterable<TValue> = AsyncIterable<string> & {
[serialized]: TValue;
};
Expand Down
124 changes: 97 additions & 27 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,44 @@
tsonPromise,
} from "../index.js";
import { assert } from "../internals/assert.js";
import {
mapIterable,
readableStreamToAsyncIterable,
} from "../internals/iterableUtils.js";
import { createTestServer } from "../internals/testUtils.js";
import { createTestServer, createBodyStream } from "../internals/testUtils.js";

Check failure on line 10 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected "createBodyStream" to come before "createTestServer"
import { TsonAsyncOptions } from "./asyncTypes.js";


test("deserialize variable chunk length", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});
{
const iterable = (async function* () {
const body = createBodyStream((async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}';
yield "\n,\n[\n]\n]";
})();
const result = await tson.parse(iterable);
})());
const result = await tson.parse(body);
expect(result).toEqual({ foo: "bar" });
}

{
const iterable = (async function* () {
const body = createBodyStream((async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]';
})();
const result = await tson.parse(iterable);
})());
const result = await tson.parse(body);
expect(result).toEqual({ foo: "bar" });
}

{
const iterable = (async function* () {
const body = createBodyStream((async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json"';
yield ':{"foo":"b';
yield 'ar"},"nonce":"__tson"}\n,\n';
yield "[\n]\n";
yield "]";
})();
const result = await tson.parse(iterable);
})());
const result = await tson.parse(body);
expect(result).toEqual({ foo: "bar" });
}
});
Expand All @@ -65,8 +62,8 @@
};

const strIterable = tson.stringify(obj);

const result = await tson.parse(strIterable);
const body = createBodyStream(strIterable);
const result = await tson.parse(body);

expect(result).toEqual(obj);
}
Expand All @@ -78,8 +75,9 @@
};

const strIterable = tson.stringify(obj);

const result = await tson.parse(strIterable);
const body = createBodyStream(strIterable);
const resultRaw = await tson.parse(body);
const result = resultRaw as typeof obj;

expect(await result.foo).toEqual("bar");
}
Expand Down Expand Up @@ -112,8 +110,9 @@
};

const strIterable = tson.stringify(input);

const output = await tson.parse(strIterable);
const body = createBodyStream(strIterable);
const outputRaw = await tson.parse(body);
const output = outputRaw as typeof input;

expect(output.foo).toEqual("bar");

Expand Down Expand Up @@ -175,15 +174,86 @@

assert(response.body);

const textDecoder = new TextDecoder();
const parsedRaw = await tson.parse(response.body);
const parsed = parsedRaw as MockObj;

expect(parsed.foo).toEqual("bar");

const results = [];

for await (const value of parsed.iterable) {
results.push(value);
}

expect(results).toEqual([1n, 2n, 3n, 4n, 5n]);

expect(await parsed.promise).toEqual(42);

await expect(
parsed.rejectedPromise,
).rejects.toThrowErrorMatchingInlineSnapshot('"Promise rejected"');

server.close();
});

test("e2e: server crash", async () => {
function createMockObj() {
async function* generator() {
for (const number of [1n, 2n, 3n, 4n, 5n]) {
await new Promise((resolve) => setTimeout(resolve, 1));
yield number;
}
}

// convert the response body to an async iterable
const stringIterator = mapIterable(
readableStreamToAsyncIterable(response.body),
(v) => textDecoder.decode(v),
);
return {
foo: "bar",
iterable: generator(),
promise: Promise.resolve(42),
rejectedPromise: Promise.reject(new Error("rejected promise")),
};
}

type MockObj = ReturnType<typeof createMockObj>;

// ------------- server -------------------
const opts: TsonAsyncOptions = {
types: [tsonPromise, tsonAsyncIterator, tsonBigint],
};

const server = await createTestServer({
handleRequest: async (_req, res) => {
const tson = createTsonAsync(opts);

const obj = createMockObj();
const strIterarable = tson.stringify(obj, 4);

let closed = false
setTimeout(() => {
closed = true
server.close();
}, 2)

// res.write(strIterarable);

for await (const value of strIterarable) {
if (closed) continue

Check failure on line 239 in src/async/deserializeAsync.test.ts

View workflow job for this annotation

GitHub Actions / lint

Unnecessary conditional, value is always falsy
res.write(value);
}

res.end();
},
});

// ------------- client -------------------
const tson = createTsonAsync(opts);

// do a streamed fetch request
const response = await fetch(server.url);

assert(response.body);

const parsed = await tson.parse<MockObj>(stringIterator);
const parsedRaw = await tson.parse(response.body);
const parsed = parsedRaw as MockObj;

expect(parsed.foo).toEqual("bar");

Expand Down
21 changes: 15 additions & 6 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import {
TsonAsyncIndex,
TsonAsyncOptions,
TsonAsyncStringifierIterable,
TsonAsyncStringifiedStream,
TsonAsyncType,
} from "./asyncTypes.js";
import { TsonAsyncValueTuple } from "./serializeAsync.js";
Expand All @@ -25,7 +25,7 @@
| TsonTransformerSerializeDeserialize<any, any>;

type TsonParseAsync = <TValue>(
string: AsyncIterable<string> | TsonAsyncStringifierIterable<TValue>,
string: ReadableStream<Uint8Array> | TsonAsyncStringifiedStream<TValue>,
) => Promise<TValue>;

export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
Expand All @@ -42,13 +42,22 @@
}
}

return async (iterable: AsyncIterable<string>) => {
return async (stream: ReadableStream<Uint8Array>) => {
Copy link
Member

@KATT KATT Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

	return async (stream: ReadableStream<Uint8Array> | AsyncIterable<string> | ReadableStream<Uint8Array>) => {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • we don't care that much about bundle size for this to matter
  • it'd be nicer dx?
  • they can be converted from/to each other seamlessly AFAIK

// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
TsonAsyncIndex,
ReadableStreamDefaultController<unknown>
>();
const iterator = iterable[Symbol.asyncIterator]();

const decoder = new TextDecoder();
const textStream = stream.pipeThrough(
new TransformStream<Uint8Array, string>({
async transform(chunk, controller) {

Check failure on line 55 in src/async/deserializeAsync.ts

View workflow job for this annotation

GitHub Actions / lint

Async method 'transform' has no 'await' expression
controller.enqueue(decoder.decode(chunk));
},
}),
);
const reader = textStream.getReader();

const walker: WalkerFactory = (nonce) => {
const walk: WalkFn = (value) => {
Expand Down Expand Up @@ -123,7 +132,7 @@
do {
lines.forEach(readLine);
lines.length = 0;
const nextValue = await iterator.next();
const nextValue = await reader.read();
if (!nextValue.done) {
accumulator += nextValue.value;
const parts = accumulator.split("\n");
Expand All @@ -144,7 +153,7 @@

const lines: string[] = [];
do {
const nextValue = await iterator.next();
const nextValue = await reader.read();
if (nextValue.done) {
throw new TsonError("Unexpected end of stream before head");
}
Expand Down
14 changes: 9 additions & 5 deletions src/handlers/tsonPromise.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
createTestServer,
waitError,
waitFor,
createBodyStream,

Check failure on line 22 in src/handlers/tsonPromise.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected "createBodyStream" to come before "waitFor"
} from "../internals/testUtils.js";
import { TsonSerialized, TsonType } from "../types.js";

Expand Down Expand Up @@ -354,7 +355,8 @@

const strIterarable = tson.stringify(obj, 4);

const value = await tson.parse(strIterarable);
const valueRaw = await tson.parse(createBodyStream(strIterarable));
const value = valueRaw as typeof obj;

expect(value).toHaveProperty("foo");
expect(await value.foo).toBe("bar");
Expand All @@ -377,7 +379,8 @@

const strIterarable = tson.stringify(obj, 4);

const value = await tson.parse(strIterarable);
const valueRaw = await tson.parse(createBodyStream(strIterarable));
const value = valueRaw as typeof obj;

const firstPromise = await value.promise;

Expand Down Expand Up @@ -442,7 +445,7 @@
(v) => textDecoder.decode(v),
);

const value = await tson.parse(stringIterator);
const value = await tson.parse(createBodyStream(stringIterator));
const asObj = value as Obj;

const firstPromise = await asObj.promise;
Expand Down Expand Up @@ -481,7 +484,7 @@
};
const iterator = stringify(original);

const [_result, deferreds] = await parse(iterator);
const [_result, deferreds] = await parse(createBodyStream(iterator));

const result = _result as typeof original;
await waitFor(() => {
Expand Down Expand Up @@ -549,7 +552,8 @@
// parse
const iterator = stringify(original, 2);

const result = await parse(iterator);
const resultRaw = await parse(createBodyStream(iterator));
const result = resultRaw as typeof original;

expect(result).toMatchInlineSnapshot(`
{
Expand Down
8 changes: 4 additions & 4 deletions src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
createTsonAsync,
tsonPromise,
} from "./index.js";
import { expectError, waitError, waitFor } from "./internals/testUtils.js";
import { expectError, waitError, waitFor, createBodyStream } from "./internals/testUtils.js";

Check failure on line 10 in src/index.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected "createBodyStream" to come before "waitFor"
import { TsonSerialized } from "./types.js";

test("multiple handlers for primitive string found", () => {
Expand Down Expand Up @@ -95,7 +95,7 @@
const gen = generator();
await createTsonAsync({
types: [stringHandler, stringHandler],
}).parse(gen);
}).parse(createBodyStream(gen));
});

expect(err).toMatchInlineSnapshot(
Expand Down Expand Up @@ -136,7 +136,7 @@
const gen = generator();
await createTsonAsync({
types: [],
}).parse(gen);
}).parse(createBodyStream(gen));
});

expect(err).toMatchInlineSnapshot(
Expand Down Expand Up @@ -171,7 +171,7 @@
await createTsonAsync({
onStreamError: onErrorSpy,
types: [tsonPromise],
}).parse(generator());
}).parse(createBodyStream(generator()));

await waitFor(() => {
expect(onErrorSpy).toHaveBeenCalledTimes(1);
Expand Down
12 changes: 12 additions & 0 deletions src/internals/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,15 @@
url: `http://localhost:${port}`,
};
}

export function createBodyStream(iterator: AsyncIterable<string>) {
return new ReadableStream<Uint8Array>({
async start(controller) {
const encoder = new TextEncoder();
for await (const chunk of iterator) {
controller.enqueue(encoder.encode(chunk));
}
controller.close();

Check failure on line 83 in src/internals/testUtils.ts

View workflow job for this annotation

GitHub Actions / lint

Expected blank line before this statement
}
})
}
Loading