Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
use a stream
Browse files Browse the repository at this point in the history
  • Loading branch information
KATT committed Oct 6, 2023
1 parent 7ce7cbd commit 4224a94
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 84 deletions.
20 changes: 10 additions & 10 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ import {
import { createTestServer } from "../internals/testUtils.js";
import { TsonAsyncOptions } from "./asyncTypes.js";

test("deserialize async iterable", async () => {
test("deserialize promise", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
});

{
// plain obj
const obj = {
foo: "bar",
};
// {
// // plain obj
// const obj = {
// foo: "bar",
// };

const strIterable = tson.stringify(obj);
// const strIterable = tson.stringify(obj);

const result = await tson.parse(strIterable);
// const result = await tson.parse(strIterable);

expect(result).toEqual(obj);
}
// expect(result).toEqual(obj);
// }

{
// promise
Expand Down
110 changes: 36 additions & 74 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { TsonError } from "../errors.js";
import { assert } from "../internals/assert.js";
import { isTsonTuple } from "../internals/isTsonTuple.js";
import { readableStreamToAsyncIterable } from "../internals/iterableUtils.js";
import { mapOrReturn } from "../internals/mapOrReturn.js";
import {
TsonNonce,
Expand All @@ -28,32 +29,6 @@ type TsonParseAsync = <TValue>(
string: AsyncIterable<string> | TsonAsyncStringifierIterator<TValue>,
) => Promise<TValue>;

function createDeferred<T>() {
type PromiseResolve = (value: T) => void;
type PromiseReject = (reason: unknown) => void;
const deferred = {} as {
promise: Promise<T>;
reject: PromiseReject;
resolve: PromiseResolve;
};
deferred.promise = new Promise<T>((resolve, reject) => {
deferred.resolve = resolve;
deferred.reject = reject;
});
return deferred;
}

type Deferred<T> = ReturnType<typeof createDeferred<T>>;

function createSafeDeferred<T>() {
const deferred = createDeferred();

deferred.promise.catch(() => {
// prevent unhandled promise rejection
});
return deferred as Deferred<T>;
}

export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
const typeByKey: Record<string, AnyTsonTransformerSerializeDeserialize> = {};

Expand All @@ -70,14 +45,15 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

return async (iterator: AsyncIterable<string>) => {
// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
const instance = iterator[Symbol.asyncIterator]();

const streamByIndex = new Map<
TsonAsyncIndex,
{
next: Deferred<unknown>;
values: unknown[];
controller: ReadableStreamController<unknown>;
stream: ReadableStream<unknown>;
}
>();
const instance = iterator[Symbol.asyncIterator]();

const walker: WalkerFactory = (nonce) => {
const walk: WalkFn = (value) => {
Expand All @@ -94,41 +70,33 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

const idx = serializedValue as TsonAsyncIndex;

const self = {
next: createSafeDeferred(),
values: [],
};
cache.set(idx, self);
// create a new stream for this index if one doesn't exist
assert(
!streamByIndex.has(idx),
`Stream already exists for index ${idx}`,
);
let controller: ReadableStreamDefaultController<unknown> =
null as unknown as ReadableStreamDefaultController<unknown>;
const stream = new ReadableStream({
start(c) {
controller = c;
},
});
assert(controller, "Controller not set");
streamByIndex.set(idx, {
controller,
stream,
});

assert(controller as any, "No controller found");

return transformer.deserialize({
// abortSignal
onDone() {
cache.delete(idx);
},
stream: {
[Symbol.asyncIterator]: () => {
let index = 0;
return {
next: async () => {
const idx = index++;

if (self.values.length > idx) {
return {
done: false,
value: self.values[idx],
};
}

await self.next.promise;

return {
done: false,
value: self.values[idx],
};
},
};
},
controller.close();
// cache.delete(idx);
},
stream: readableStreamToAsyncIterable(stream),
});
}

Expand Down Expand Up @@ -158,16 +126,14 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

const [index, result] = JSON.parse(str) as TsonAsyncValueTuple;

const item = cache.get(index);
const item = streamByIndex.get(index);

const walkedResult = walk(result);

assert(item, `No deferred found for index ${index}`);
assert(item, `No stream found for index ${index}`);

// resolving deferred
item.values.push(walkedResult);
item.next.resolve(walkedResult);
item.next = createSafeDeferred();
// FIXME: I don't know why this requires array buffer
item.controller.enqueue(walkedResult as any);
}

buffer.forEach(readLine);
Expand All @@ -179,8 +145,6 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

nextValue = await instance.next();
}

assert(!cache.size, `Stream ended with ${cache.size} pending promises`);
}

async function init() {
Expand Down Expand Up @@ -227,13 +191,11 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
{ cause },
);

// cancel all pending promises
for (const deferred of cache.values()) {
deferred.next.reject(err);
// cancel all pending streams
for (const { controller } of streamByIndex.values()) {
controller.error(err);
}

cache.clear();

opts.onStreamError?.(err);
});
}
Expand All @@ -242,7 +204,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
const result = await init().catch((cause: unknown) => {
throw new TsonError("Failed to initialize TSON stream", { cause });
});
return [result, cache] as const;
return [result, streamByIndex] as const;
};
}

Expand Down

0 comments on commit 4224a94

Please sign in to comment.