Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

fix: deserializeAsync string chunks can split anywhere #30

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/async/asyncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import { TsonType } from "../types.js";
import { TsonBranded, TsonTypeTesterCustom } from "../types.js";
import { serialized } from "../types.js";

export type TsonAsyncStringifierIterator<TValue> = AsyncIterable<string> & {
export type TsonAsyncStringifierIterable<TValue> = AsyncIterable<string> & {
[serialized]: TValue;
};

export type TsonAsyncStringifier = <TValue>(
value: TValue,
space?: number,
) => TsonAsyncStringifierIterator<TValue>;
) => TsonAsyncStringifierIterable<TValue>;
export type TsonAsyncIndex = TsonBranded<number, "AsyncRegistered">;

export interface TsonTransformerSerializeDeserializeAsync<
Expand Down
38 changes: 38 additions & 0 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,44 @@ import {
import { createTestServer } from "../internals/testUtils.js";
import { TsonAsyncOptions } from "./asyncTypes.js";

test("deserialize variable chunk length", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});
{
const iterable = (async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}';
yield "\n,\n[\n]\n]";
})();
const result = await tson.parse(iterable);
expect(result).toEqual({ foo: "bar" });
}

{
const iterable = (async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]';
})();
const result = await tson.parse(iterable);
expect(result).toEqual({ foo: "bar" });
}

{
const iterable = (async function* () {
await new Promise((resolve) => setTimeout(resolve, 1));
yield '[\n{"json"';
yield ':{"foo":"b';
yield 'ar"},"nonce":"__tson"}\n,\n';
yield "[\n]\n";
yield "]";
})();
const result = await tson.parse(iterable);
expect(result).toEqual({ foo: "bar" });
}
});

test("deserialize async iterable", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
Expand Down
55 changes: 33 additions & 22 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import {
TsonAsyncIndex,
TsonAsyncOptions,
TsonAsyncStringifierIterator,
TsonAsyncStringifierIterable,
TsonAsyncType,
} from "./asyncTypes.js";
import { TsonAsyncValueTuple } from "./serializeAsync.js";
Expand All @@ -25,7 +25,7 @@ type AnyTsonTransformerSerializeDeserialize =
| TsonTransformerSerializeDeserialize<any, any>;

type TsonParseAsync = <TValue>(
string: AsyncIterable<string> | TsonAsyncStringifierIterator<TValue>,
string: AsyncIterable<string> | TsonAsyncStringifierIterable<TValue>,
) => Promise<TValue>;

function createDeferred<T>() {
Expand Down Expand Up @@ -68,7 +68,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
}
}

return async (iterator: AsyncIterable<string>) => {
return async (iterable: AsyncIterable<string>) => {
// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
TsonAsyncIndex,
Expand All @@ -77,7 +77,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
values: unknown[];
}
>();
const instance = iterator[Symbol.asyncIterator]();
const iterator = iterable[Symbol.asyncIterator]();

const walker: WalkerFactory = (nonce) => {
const walk: WalkFn = (value) => {
Expand Down Expand Up @@ -139,8 +139,8 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
};

async function getStreamedValues(
buffer: string[],

lines: string[],
accumulator: string,
walk: WalkFn,
) {
function readLine(str: string) {
Expand Down Expand Up @@ -170,29 +170,40 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
item.next = createSafeDeferred();
}

buffer.forEach(readLine);

let nextValue = await instance.next();

while (!nextValue.done) {
nextValue.value.split("\n").forEach(readLine);

nextValue = await instance.next();
}
do {
lines.forEach(readLine);
lines.length = 0;
const nextValue = await iterator.next();
if (!nextValue.done) {
accumulator += nextValue.value;
const parts = accumulator.split("\n");
accumulator = parts.pop() ?? "";
lines.push(...parts);
} else if (accumulator) {
readLine(accumulator);
}
} while (lines.length);

assert(!cache.size, `Stream ended with ${cache.size} pending promises`);
}

async function init() {
const lines: string[] = [];
let accumulator = "";

// get the head of the JSON

let lastResult: IteratorResult<string>;
const lines: string[] = [];
do {
lastResult = await instance.next();
const nextValue = await iterator.next();
if (nextValue.done) {
throw new TsonError("Unexpected end of stream before head");
}

accumulator += nextValue.value;

lines.push(...(lastResult.value as string).split("\n").filter(Boolean));
const parts = accumulator.split("\n");
accumulator = parts.pop() ?? "";
lines.push(...parts);
} while (lines.length < 2);

const [
Expand All @@ -218,7 +229,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
try {
return walk(head.json);
} finally {
getStreamedValues(buffer, walk).catch((cause) => {
getStreamedValues(buffer, accumulator, walk).catch((cause) => {
// Something went wrong while getting the streamed values

const err = new TsonError(
Expand Down Expand Up @@ -249,8 +260,8 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync {
const instance = createTsonParseAsyncInner(opts);

return (async (iterator) => {
const [result] = await instance(iterator);
return (async (iterable) => {
const [result] = await instance(iterable);

return result;
}) as TsonParseAsync;
Expand Down
Loading