From d6636af9af828a01625bd02d3bfb3f7fd114a25f Mon Sep 17 00:00:00 2001 From: Sheraff Date: Fri, 6 Oct 2023 10:26:13 +0200 Subject: [PATCH 1/2] [deserializeAsync] Fix: string chunks can split anywhere --- src/async/asyncTypes.ts | 4 +- src/async/deserializeAsync.test.ts | 33 +++++ src/async/deserializeAsync.ts | 214 +++++++++++++++-------------- 3 files changed, 147 insertions(+), 104 deletions(-) diff --git a/src/async/asyncTypes.ts b/src/async/asyncTypes.ts index 41657fc9..e9aca628 100644 --- a/src/async/asyncTypes.ts +++ b/src/async/asyncTypes.ts @@ -3,14 +3,14 @@ import { TsonType } from "../types.js"; import { TsonBranded, TsonTypeTesterCustom } from "../types.js"; import { serialized } from "../types.js"; -export type TsonAsyncStringifierIterator = AsyncIterable & { +export type TsonAsyncStringifierIterable = AsyncIterable & { [serialized]: TValue; }; export type TsonAsyncStringifier = ( value: TValue, space?: number, -) => TsonAsyncStringifierIterator; +) => TsonAsyncStringifierIterable; export type TsonAsyncIndex = TsonBranded; export interface TsonTransformerSerializeDeserializeAsync< diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 9ac7725e..43d85fc6 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -14,6 +14,39 @@ import { import { createTestServer } from "../internals/testUtils.js"; import { TsonAsyncOptions } from "./asyncTypes.js"; +test("deserialize variable chunk length", async () => { + const tson = createTsonAsync({ + nonce: () => "__tson", + types: [tsonAsyncIterator, tsonPromise, tsonBigint], + }); + { + const iterable = (async function* () { + yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}' + yield '\n,\n[\n]\n]' + })(); + const result = await tson.parse(iterable); + expect(result).toEqual({foo: "bar"}); + } + { + const iterable = (async function* () { + yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]' + })(); + const result = await tson.parse(iterable); + expect(result).toEqual({foo: "bar"}); + } + { + const iterable = (async function* () { + yield '[\n{"json"' + yield ':{"foo":"b' + yield 'ar"},"nonce":"__tson"}\n,\n' + yield '[\n]\n' + yield ']' + })(); + const result = await tson.parse(iterable); + expect(result).toEqual({foo: "bar"}); + } +}) + test("deserialize async iterable", async () => { const tson = createTsonAsync({ nonce: () => "__tson", diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 1a051874..34510f6a 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -1,199 +1,209 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ -import { TsonError } from "../errors.js"; -import { assert } from "../internals/assert.js"; -import { isTsonTuple } from "../internals/isTsonTuple.js"; -import { mapOrReturn } from "../internals/mapOrReturn.js"; +import { TsonError } from "../errors.js" +import { assert } from "../internals/assert.js" +import { isTsonTuple } from "../internals/isTsonTuple.js" +import { mapOrReturn } from "../internals/mapOrReturn.js" import { TsonNonce, TsonSerialized, TsonTransformerSerializeDeserialize, -} from "../types.js"; +} from "../types.js" import { TsonAsyncIndex, TsonAsyncOptions, - TsonAsyncStringifierIterator, + TsonAsyncStringifierIterable, TsonAsyncType, -} from "./asyncTypes.js"; -import { TsonAsyncValueTuple } from "./serializeAsync.js"; +} from "./asyncTypes.js" +import { TsonAsyncValueTuple } from "./serializeAsync.js" -type WalkFn = (value: unknown) => unknown; -type WalkerFactory = (nonce: TsonNonce) => WalkFn; +type WalkFn = (value: unknown) => unknown +type WalkerFactory = (nonce: TsonNonce) => WalkFn type AnyTsonTransformerSerializeDeserialize = | TsonAsyncType - | TsonTransformerSerializeDeserialize; + | TsonTransformerSerializeDeserialize type TsonParseAsync = ( - string: AsyncIterable | TsonAsyncStringifierIterator, -) => Promise; + string: AsyncIterable | TsonAsyncStringifierIterable, +) => Promise function createDeferred() { - type PromiseResolve = (value: T) => void; - type PromiseReject = (reason: unknown) => void; + type PromiseResolve = (value: T) => void + type PromiseReject = (reason: unknown) => void const deferred = {} as { - promise: Promise; - reject: PromiseReject; - resolve: PromiseResolve; - }; + promise: Promise + reject: PromiseReject + resolve: PromiseResolve + } deferred.promise = new Promise((resolve, reject) => { - deferred.resolve = resolve; - deferred.reject = reject; - }); - return deferred; + deferred.resolve = resolve + deferred.reject = reject + }) + return deferred } -type Deferred = ReturnType>; +type Deferred = ReturnType> function createSafeDeferred() { - const deferred = createDeferred(); + const deferred = createDeferred() deferred.promise.catch(() => { // prevent unhandled promise rejection - }); - return deferred as Deferred; + }) + return deferred as Deferred } export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { - const typeByKey: Record = {}; + const typeByKey: Record = {} for (const handler of opts.types) { if (handler.key) { if (typeByKey[handler.key]) { - throw new Error(`Multiple handlers for key ${handler.key} found`); + throw new Error(`Multiple handlers for key ${handler.key} found`) } typeByKey[handler.key] = - handler as AnyTsonTransformerSerializeDeserialize; + handler as AnyTsonTransformerSerializeDeserialize } } - return async (iterator: AsyncIterable) => { + return async (iterable: AsyncIterable) => { // this is an awful hack to get around making a some sort of pipeline const cache = new Map< TsonAsyncIndex, { - next: Deferred; - values: unknown[]; + next: Deferred + values: unknown[] } - >(); - const instance = iterator[Symbol.asyncIterator](); + >() + const iterator = iterable[Symbol.asyncIterator]() const walker: WalkerFactory = (nonce) => { const walk: WalkFn = (value) => { if (isTsonTuple(value, nonce)) { - const [type, serializedValue] = value; - const transformer = typeByKey[type]; + const [type, serializedValue] = value + const transformer = typeByKey[type] - assert(transformer, `No transformer found for type ${type}`); + assert(transformer, `No transformer found for type ${type}`) - const walkedValue = walk(serializedValue); + const walkedValue = walk(serializedValue) if (!transformer.async) { - return transformer.deserialize(walk(walkedValue)); + return transformer.deserialize(walk(walkedValue)) } - const idx = serializedValue as TsonAsyncIndex; + const idx = serializedValue as TsonAsyncIndex const self = { next: createSafeDeferred(), values: [], - }; - cache.set(idx, self); + } + cache.set(idx, self) return transformer.deserialize({ // abortSignal onDone() { - cache.delete(idx); + cache.delete(idx) }, stream: { [Symbol.asyncIterator]: () => { - let index = 0; + let index = 0 return { next: async () => { - const idx = index++; + const idx = index++ if (self.values.length > idx) { return { done: false, value: self.values[idx], - }; + } } - await self.next.promise; + await self.next.promise return { done: false, value: self.values[idx], - }; + } }, - }; + } }, }, - }); + }) } - return mapOrReturn(value, walk); - }; + return mapOrReturn(value, walk) + } - return walk; - }; + return walk + } async function getStreamedValues( - buffer: string[], - + lines: string[], + accumulator: string, walk: WalkFn, ) { function readLine(str: string) { - str = str.trimStart(); + str = str.trimStart() if (str.startsWith(",")) { // ignore leading comma - str = str.slice(1); + str = str.slice(1) } if (str.length < 2) { // minimum length is 2: '[]' - return; + return } - const [index, result] = JSON.parse(str) as TsonAsyncValueTuple; + const [index, result] = JSON.parse(str) as TsonAsyncValueTuple - const item = cache.get(index); + const item = cache.get(index) - const walkedResult = walk(result); + const walkedResult = walk(result) - assert(item, `No deferred found for index ${index}`); + assert(item, `No deferred found for index ${index}`) // resolving deferred - item.values.push(walkedResult); - item.next.resolve(walkedResult); - item.next = createSafeDeferred(); + item.values.push(walkedResult) + item.next.resolve(walkedResult) + item.next = createSafeDeferred() } - buffer.forEach(readLine); - - let nextValue = await instance.next(); - - while (!nextValue.done) { - nextValue.value.split("\n").forEach(readLine); - - nextValue = await instance.next(); - } + do { + lines.forEach(readLine) + lines.length = 0 + const nextValue = await iterator.next() + if (!nextValue.done) { + accumulator += nextValue.value + const parts = accumulator.split("\n") + accumulator = parts.pop() ?? "" + lines.push(...parts) + } else if (accumulator) { + readLine(accumulator) + } + } while (lines.length) - assert(!cache.size, `Stream ended with ${cache.size} pending promises`); + assert(!cache.size, `Stream ended with ${cache.size} pending promises`) } async function init() { - const lines: string[] = []; + let accumulator = "" // get the head of the JSON - let lastResult: IteratorResult; + let lines: string[] = [] do { - lastResult = await instance.next(); + const nextValue = await iterator.next() + if (nextValue.done) { + throw new TsonError("Unexpected end of stream before head") + } + accumulator += nextValue.value - lines.push(...(lastResult.value as string).split("\n").filter(Boolean)); - } while (lines.length < 2); + const parts = accumulator.split("\n") + accumulator = parts.pop() ?? "" + lines.push(...parts) + } while (lines.length < 2) const [ /** @@ -207,51 +217,51 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { // .. third line is a `,` // .. fourth line is the start of the values array ...buffer - ] = lines; + ] = lines - assert(headLine, "No head line found"); + assert(headLine, "No head line found") - const head = JSON.parse(headLine) as TsonSerialized; + const head = JSON.parse(headLine) as TsonSerialized - const walk = walker(head.nonce); + const walk = walker(head.nonce) try { - return walk(head.json); + return walk(head.json) } finally { - getStreamedValues(buffer, walk).catch((cause) => { + getStreamedValues(buffer, accumulator, walk).catch((cause) => { // Something went wrong while getting the streamed values const err = new TsonError( `Stream interrupted: ${(cause as Error).message}`, // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment { cause }, - ); + ) // cancel all pending promises for (const deferred of cache.values()) { - deferred.next.reject(err); + deferred.next.reject(err) } - cache.clear(); + cache.clear() - opts.onStreamError?.(err); - }); + opts.onStreamError?.(err) + }) } } const result = await init().catch((cause: unknown) => { - throw new TsonError("Failed to initialize TSON stream", { cause }); - }); - return [result, cache] as const; - }; + throw new TsonError("Failed to initialize TSON stream", { cause }) + }) + return [result, cache] as const + } } export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync { - const instance = createTsonParseAsyncInner(opts); + const instance = createTsonParseAsyncInner(opts) - return (async (iterator) => { - const [result] = await instance(iterator); + return (async (iterable) => { + const [result] = await instance(iterable) - return result; - }) as TsonParseAsync; + return result + }) as TsonParseAsync } From 506e19089c2d3bff7ebaf59ac708e303fe0dc2a0 Mon Sep 17 00:00:00 2001 From: Sheraff Date: Fri, 6 Oct 2023 10:37:20 +0200 Subject: [PATCH 2/2] lint --- src/async/deserializeAsync.test.ts | 29 +++-- src/async/deserializeAsync.ts | 199 +++++++++++++++-------------- 2 files changed, 117 insertions(+), 111 deletions(-) diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 43d85fc6..acd6d6af 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -21,31 +21,36 @@ test("deserialize variable chunk length", async () => { }); { const iterable = (async function* () { - yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}' - yield '\n,\n[\n]\n]' + await new Promise((resolve) => setTimeout(resolve, 1)); + yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}'; + yield "\n,\n[\n]\n]"; })(); const result = await tson.parse(iterable); - expect(result).toEqual({foo: "bar"}); + expect(result).toEqual({ foo: "bar" }); } + { const iterable = (async function* () { - yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]' + await new Promise((resolve) => setTimeout(resolve, 1)); + yield '[\n{"json":{"foo":"bar"},"nonce":"__tson"}\n,\n[\n]\n]'; })(); const result = await tson.parse(iterable); - expect(result).toEqual({foo: "bar"}); + expect(result).toEqual({ foo: "bar" }); } + { const iterable = (async function* () { - yield '[\n{"json"' - yield ':{"foo":"b' - yield 'ar"},"nonce":"__tson"}\n,\n' - yield '[\n]\n' - yield ']' + await new Promise((resolve) => setTimeout(resolve, 1)); + yield '[\n{"json"'; + yield ':{"foo":"b'; + yield 'ar"},"nonce":"__tson"}\n,\n'; + yield "[\n]\n"; + yield "]"; })(); const result = await tson.parse(iterable); - expect(result).toEqual({foo: "bar"}); + expect(result).toEqual({ foo: "bar" }); } -}) +}); test("deserialize async iterable", async () => { const tson = createTsonAsync({ diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 34510f6a..b272214f 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -1,70 +1,70 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ -import { TsonError } from "../errors.js" -import { assert } from "../internals/assert.js" -import { isTsonTuple } from "../internals/isTsonTuple.js" -import { mapOrReturn } from "../internals/mapOrReturn.js" +import { TsonError } from "../errors.js"; +import { assert } from "../internals/assert.js"; +import { isTsonTuple } from "../internals/isTsonTuple.js"; +import { mapOrReturn } from "../internals/mapOrReturn.js"; import { TsonNonce, TsonSerialized, TsonTransformerSerializeDeserialize, -} from "../types.js" +} from "../types.js"; import { TsonAsyncIndex, TsonAsyncOptions, TsonAsyncStringifierIterable, TsonAsyncType, -} from "./asyncTypes.js" -import { TsonAsyncValueTuple } from "./serializeAsync.js" +} from "./asyncTypes.js"; +import { TsonAsyncValueTuple } from "./serializeAsync.js"; -type WalkFn = (value: unknown) => unknown -type WalkerFactory = (nonce: TsonNonce) => WalkFn +type WalkFn = (value: unknown) => unknown; +type WalkerFactory = (nonce: TsonNonce) => WalkFn; type AnyTsonTransformerSerializeDeserialize = | TsonAsyncType - | TsonTransformerSerializeDeserialize + | TsonTransformerSerializeDeserialize; type TsonParseAsync = ( string: AsyncIterable | TsonAsyncStringifierIterable, -) => Promise +) => Promise; function createDeferred() { - type PromiseResolve = (value: T) => void - type PromiseReject = (reason: unknown) => void + type PromiseResolve = (value: T) => void; + type PromiseReject = (reason: unknown) => void; const deferred = {} as { - promise: Promise - reject: PromiseReject - resolve: PromiseResolve - } + promise: Promise; + reject: PromiseReject; + resolve: PromiseResolve; + }; deferred.promise = new Promise((resolve, reject) => { - deferred.resolve = resolve - deferred.reject = reject - }) - return deferred + deferred.resolve = resolve; + deferred.reject = reject; + }); + return deferred; } -type Deferred = ReturnType> +type Deferred = ReturnType>; function createSafeDeferred() { - const deferred = createDeferred() + const deferred = createDeferred(); deferred.promise.catch(() => { // prevent unhandled promise rejection - }) - return deferred as Deferred + }); + return deferred as Deferred; } export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { - const typeByKey: Record = {} + const typeByKey: Record = {}; for (const handler of opts.types) { if (handler.key) { if (typeByKey[handler.key]) { - throw new Error(`Multiple handlers for key ${handler.key} found`) + throw new Error(`Multiple handlers for key ${handler.key} found`); } typeByKey[handler.key] = - handler as AnyTsonTransformerSerializeDeserialize + handler as AnyTsonTransformerSerializeDeserialize; } } @@ -73,70 +73,70 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const cache = new Map< TsonAsyncIndex, { - next: Deferred - values: unknown[] + next: Deferred; + values: unknown[]; } - >() - const iterator = iterable[Symbol.asyncIterator]() + >(); + const iterator = iterable[Symbol.asyncIterator](); const walker: WalkerFactory = (nonce) => { const walk: WalkFn = (value) => { if (isTsonTuple(value, nonce)) { - const [type, serializedValue] = value - const transformer = typeByKey[type] + const [type, serializedValue] = value; + const transformer = typeByKey[type]; - assert(transformer, `No transformer found for type ${type}`) + assert(transformer, `No transformer found for type ${type}`); - const walkedValue = walk(serializedValue) + const walkedValue = walk(serializedValue); if (!transformer.async) { - return transformer.deserialize(walk(walkedValue)) + return transformer.deserialize(walk(walkedValue)); } - const idx = serializedValue as TsonAsyncIndex + const idx = serializedValue as TsonAsyncIndex; const self = { next: createSafeDeferred(), values: [], - } - cache.set(idx, self) + }; + cache.set(idx, self); return transformer.deserialize({ // abortSignal onDone() { - cache.delete(idx) + cache.delete(idx); }, stream: { [Symbol.asyncIterator]: () => { - let index = 0 + let index = 0; return { next: async () => { - const idx = index++ + const idx = index++; if (self.values.length > idx) { return { done: false, value: self.values[idx], - } + }; } - await self.next.promise + await self.next.promise; return { done: false, value: self.values[idx], - } + }; }, - } + }; }, }, - }) + }); } - return mapOrReturn(value, walk) - } + return mapOrReturn(value, walk); + }; - return walk - } + return walk; + }; async function getStreamedValues( lines: string[], @@ -144,66 +144,67 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { walk: WalkFn, ) { function readLine(str: string) { - str = str.trimStart() + str = str.trimStart(); if (str.startsWith(",")) { // ignore leading comma - str = str.slice(1) + str = str.slice(1); } if (str.length < 2) { // minimum length is 2: '[]' - return + return; } - const [index, result] = JSON.parse(str) as TsonAsyncValueTuple + const [index, result] = JSON.parse(str) as TsonAsyncValueTuple; - const item = cache.get(index) + const item = cache.get(index); - const walkedResult = walk(result) + const walkedResult = walk(result); - assert(item, `No deferred found for index ${index}`) + assert(item, `No deferred found for index ${index}`); // resolving deferred - item.values.push(walkedResult) - item.next.resolve(walkedResult) - item.next = createSafeDeferred() + item.values.push(walkedResult); + item.next.resolve(walkedResult); + item.next = createSafeDeferred(); } do { - lines.forEach(readLine) - lines.length = 0 - const nextValue = await iterator.next() + lines.forEach(readLine); + lines.length = 0; + const nextValue = await iterator.next(); if (!nextValue.done) { - accumulator += nextValue.value - const parts = accumulator.split("\n") - accumulator = parts.pop() ?? "" - lines.push(...parts) + accumulator += nextValue.value; + const parts = accumulator.split("\n"); + accumulator = parts.pop() ?? ""; + lines.push(...parts); } else if (accumulator) { - readLine(accumulator) + readLine(accumulator); } - } while (lines.length) + } while (lines.length); - assert(!cache.size, `Stream ended with ${cache.size} pending promises`) + assert(!cache.size, `Stream ended with ${cache.size} pending promises`); } async function init() { - let accumulator = "" + let accumulator = ""; // get the head of the JSON - let lines: string[] = [] + const lines: string[] = []; do { - const nextValue = await iterator.next() + const nextValue = await iterator.next(); if (nextValue.done) { - throw new TsonError("Unexpected end of stream before head") + throw new TsonError("Unexpected end of stream before head"); } - accumulator += nextValue.value - const parts = accumulator.split("\n") - accumulator = parts.pop() ?? "" - lines.push(...parts) - } while (lines.length < 2) + accumulator += nextValue.value; + + const parts = accumulator.split("\n"); + accumulator = parts.pop() ?? ""; + lines.push(...parts); + } while (lines.length < 2); const [ /** @@ -217,16 +218,16 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { // .. third line is a `,` // .. fourth line is the start of the values array ...buffer - ] = lines + ] = lines; - assert(headLine, "No head line found") + assert(headLine, "No head line found"); - const head = JSON.parse(headLine) as TsonSerialized + const head = JSON.parse(headLine) as TsonSerialized; - const walk = walker(head.nonce) + const walk = walker(head.nonce); try { - return walk(head.json) + return walk(head.json); } finally { getStreamedValues(buffer, accumulator, walk).catch((cause) => { // Something went wrong while getting the streamed values @@ -235,33 +236,33 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { `Stream interrupted: ${(cause as Error).message}`, // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment { cause }, - ) + ); // cancel all pending promises for (const deferred of cache.values()) { - deferred.next.reject(err) + deferred.next.reject(err); } - cache.clear() + cache.clear(); - opts.onStreamError?.(err) - }) + opts.onStreamError?.(err); + }); } } const result = await init().catch((cause: unknown) => { - throw new TsonError("Failed to initialize TSON stream", { cause }) - }) - return [result, cache] as const - } + throw new TsonError("Failed to initialize TSON stream", { cause }); + }); + return [result, cache] as const; + }; } export function createTsonParseAsync(opts: TsonAsyncOptions): TsonParseAsync { - const instance = createTsonParseAsyncInner(opts) + const instance = createTsonParseAsyncInner(opts); return (async (iterable) => { - const [result] = await instance(iterable) + const [result] = await instance(iterable); - return result - }) as TsonParseAsync + return result; + }) as TsonParseAsync; }