From 4224a94c06c74c24216b74235d95dbe9cbb71661 Mon Sep 17 00:00:00 2001 From: KATT Date: Fri, 6 Oct 2023 10:51:10 +0200 Subject: [PATCH] use a stream --- src/async/deserializeAsync.test.ts | 20 +++--- src/async/deserializeAsync.ts | 110 ++++++++++------------------- 2 files changed, 46 insertions(+), 84 deletions(-) diff --git a/src/async/deserializeAsync.test.ts b/src/async/deserializeAsync.test.ts index 9ac7725e..d3ddb8c4 100644 --- a/src/async/deserializeAsync.test.ts +++ b/src/async/deserializeAsync.test.ts @@ -14,24 +14,24 @@ import { import { createTestServer } from "../internals/testUtils.js"; import { TsonAsyncOptions } from "./asyncTypes.js"; -test("deserialize async iterable", async () => { +test("deserialize promise", async () => { const tson = createTsonAsync({ nonce: () => "__tson", types: [tsonAsyncIterator, tsonPromise, tsonBigint], }); - { - // plain obj - const obj = { - foo: "bar", - }; + // { + // // plain obj + // const obj = { + // foo: "bar", + // }; - const strIterable = tson.stringify(obj); + // const strIterable = tson.stringify(obj); - const result = await tson.parse(strIterable); + // const result = await tson.parse(strIterable); - expect(result).toEqual(obj); - } + // expect(result).toEqual(obj); + // } { // promise diff --git a/src/async/deserializeAsync.ts b/src/async/deserializeAsync.ts index 1a051874..8abd837d 100644 --- a/src/async/deserializeAsync.ts +++ b/src/async/deserializeAsync.ts @@ -3,6 +3,7 @@ import { TsonError } from "../errors.js"; import { assert } from "../internals/assert.js"; import { isTsonTuple } from "../internals/isTsonTuple.js"; +import { readableStreamToAsyncIterable } from "../internals/iterableUtils.js"; import { mapOrReturn } from "../internals/mapOrReturn.js"; import { TsonNonce, @@ -28,32 +29,6 @@ type TsonParseAsync = ( string: AsyncIterable | TsonAsyncStringifierIterator, ) => Promise; -function createDeferred() { - type PromiseResolve = (value: T) => void; - type PromiseReject = (reason: unknown) => void; - const deferred = {} as { - promise: Promise; - reject: PromiseReject; - resolve: PromiseResolve; - }; - deferred.promise = new Promise((resolve, reject) => { - deferred.resolve = resolve; - deferred.reject = reject; - }); - return deferred; -} - -type Deferred = ReturnType>; - -function createSafeDeferred() { - const deferred = createDeferred(); - - deferred.promise.catch(() => { - // prevent unhandled promise rejection - }); - return deferred as Deferred; -} - export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const typeByKey: Record = {}; @@ -70,14 +45,15 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { return async (iterator: AsyncIterable) => { // this is an awful hack to get around making a some sort of pipeline - const cache = new Map< + const instance = iterator[Symbol.asyncIterator](); + + const streamByIndex = new Map< TsonAsyncIndex, { - next: Deferred; - values: unknown[]; + controller: ReadableStreamController; + stream: ReadableStream; } >(); - const instance = iterator[Symbol.asyncIterator](); const walker: WalkerFactory = (nonce) => { const walk: WalkFn = (value) => { @@ -94,41 +70,33 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const idx = serializedValue as TsonAsyncIndex; - const self = { - next: createSafeDeferred(), - values: [], - }; - cache.set(idx, self); + // create a new stream for this index if one doesn't exist + assert( + !streamByIndex.has(idx), + `Stream already exists for index ${idx}`, + ); + let controller: ReadableStreamDefaultController = + null as unknown as ReadableStreamDefaultController; + const stream = new ReadableStream({ + start(c) { + controller = c; + }, + }); + assert(controller, "Controller not set"); + streamByIndex.set(idx, { + controller, + stream, + }); + + assert(controller as any, "No controller found"); return transformer.deserialize({ // abortSignal onDone() { - cache.delete(idx); - }, - stream: { - [Symbol.asyncIterator]: () => { - let index = 0; - return { - next: async () => { - const idx = index++; - - if (self.values.length > idx) { - return { - done: false, - value: self.values[idx], - }; - } - - await self.next.promise; - - return { - done: false, - value: self.values[idx], - }; - }, - }; - }, + controller.close(); + // cache.delete(idx); }, + stream: readableStreamToAsyncIterable(stream), }); } @@ -158,16 +126,14 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const [index, result] = JSON.parse(str) as TsonAsyncValueTuple; - const item = cache.get(index); + const item = streamByIndex.get(index); const walkedResult = walk(result); - assert(item, `No deferred found for index ${index}`); + assert(item, `No stream found for index ${index}`); - // resolving deferred - item.values.push(walkedResult); - item.next.resolve(walkedResult); - item.next = createSafeDeferred(); + // FIXME: I don't know why this requires array buffer + item.controller.enqueue(walkedResult as any); } buffer.forEach(readLine); @@ -179,8 +145,6 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { nextValue = await instance.next(); } - - assert(!cache.size, `Stream ended with ${cache.size} pending promises`); } async function init() { @@ -227,13 +191,11 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { { cause }, ); - // cancel all pending promises - for (const deferred of cache.values()) { - deferred.next.reject(err); + // cancel all pending streams + for (const { controller } of streamByIndex.values()) { + controller.error(err); } - cache.clear(); - opts.onStreamError?.(err); }); } @@ -242,7 +204,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) { const result = await init().catch((cause: unknown) => { throw new TsonError("Failed to initialize TSON stream", { cause }); }); - return [result, cache] as const; + return [result, streamByIndex] as const; }; }