Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
chore: Revert "chore: use ReadableStreams in deserialization" (#33)
Browse files Browse the repository at this point in the history
Reverts #31
  • Loading branch information
KATT authored Oct 6, 2023
1 parent ccddead commit 3c4c867
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 121 deletions.
2 changes: 1 addition & 1 deletion src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import { createTestServer } from "../internals/testUtils.js";
import { TsonAsyncOptions } from "./asyncTypes.js";

test("deserialize promise", async () => {
test("deserialize async iterable", async () => {
const tson = createTsonAsync({
nonce: () => "__tson",
types: [tsonAsyncIterator, tsonPromise, tsonBigint],
Expand Down
126 changes: 74 additions & 52 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import { TsonError } from "../errors.js";
import { assert } from "../internals/assert.js";
import { isTsonTuple } from "../internals/isTsonTuple.js";
import { readableStreamToAsyncIterable } from "../internals/iterableUtils.js";
import { mapOrReturn } from "../internals/mapOrReturn.js";
import {
TsonNonce,
Expand All @@ -29,6 +28,32 @@ type TsonParseAsync = <TValue>(
string: AsyncIterable<string> | TsonAsyncStringifierIterator<TValue>,
) => Promise<TValue>;

function createDeferred<T>() {
type PromiseResolve = (value: T) => void;
type PromiseReject = (reason: unknown) => void;
const deferred = {} as {
promise: Promise<T>;
reject: PromiseReject;
resolve: PromiseResolve;
};
deferred.promise = new Promise<T>((resolve, reject) => {
deferred.resolve = resolve;
deferred.reject = reject;
});
return deferred;
}

type Deferred<T> = ReturnType<typeof createDeferred<T>>;

function createSafeDeferred<T>() {
const deferred = createDeferred();

deferred.promise.catch(() => {
// prevent unhandled promise rejection
});
return deferred as Deferred<T>;
}

export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
const typeByKey: Record<string, AnyTsonTransformerSerializeDeserialize> = {};

Expand All @@ -45,15 +70,14 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

return async (iterator: AsyncIterable<string>) => {
// this is an awful hack to get around making a some sort of pipeline
const instance = iterator[Symbol.asyncIterator]();

const streamByIndex = new Map<
const cache = new Map<
TsonAsyncIndex,
{
controller: ReadableStreamController<unknown>;
stream: ReadableStream<unknown>;
next: Deferred<unknown>;
values: unknown[];
}
>();
const instance = iterator[Symbol.asyncIterator]();

const walker: WalkerFactory = (nonce) => {
const walk: WalkFn = (value) => {
Expand All @@ -70,36 +94,41 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

const idx = serializedValue as TsonAsyncIndex;

// create a new stream for this index if one doesn't exist
assert(
!streamByIndex.has(idx),
`Stream already exists for index ${idx}`,
);
let controller: ReadableStreamDefaultController<unknown> =
null as unknown as ReadableStreamDefaultController<unknown>;
const stream = new ReadableStream({
start(c) {
controller = c;
},
});
assert(controller, "Controller not set");

streamByIndex.set(idx, {
controller,
stream,
});

assert(controller as any, "No controller found");
const self = {
next: createSafeDeferred(),
values: [],
};
cache.set(idx, self);

return transformer.deserialize({
// abortSignal
onDone() {
try {
controller.close();
} catch {
// ignore
}
cache.delete(idx);
},
stream: {
[Symbol.asyncIterator]: () => {
let index = 0;
return {
next: async () => {
const idx = index++;

if (self.values.length > idx) {
return {
done: false,
value: self.values[idx],
};
}

await self.next.promise;

return {
done: false,
value: self.values[idx],
};
},
};
},
},
stream: readableStreamToAsyncIterable(stream),
});
}

Expand Down Expand Up @@ -129,15 +158,16 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

const [index, result] = JSON.parse(str) as TsonAsyncValueTuple;

const item = streamByIndex.get(index);
const item = cache.get(index);

const walkedResult = walk(result);

assert(item, `No stream found for index ${index}`);
assert(item, `No deferred found for index ${index}`);

// FIXME: I don't know why this requires array buffer
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
item.controller.enqueue(walkedResult as any);
// resolving deferred
item.values.push(walkedResult);
item.next.resolve(walkedResult);
item.next = createSafeDeferred();
}

buffer.forEach(readLine);
Expand All @@ -150,13 +180,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
nextValue = await instance.next();
}

for (const item of streamByIndex.values()) {
try {
item.controller.close();
} catch {
// ignore
}
}
assert(!cache.size, `Stream ended with ${cache.size} pending promises`);
}

async function init() {
Expand Down Expand Up @@ -203,15 +227,13 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
{ cause },
);

// close all pending streams
for (const item of streamByIndex.values()) {
try {
item.controller.close();
} catch {
// ignore
}
// cancel all pending promises
for (const deferred of cache.values()) {
deferred.next.reject(err);
}

cache.clear();

opts.onStreamError?.(err);
});
}
Expand All @@ -220,7 +242,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
const result = await init().catch((cause: unknown) => {
throw new TsonError("Failed to initialize TSON stream", { cause });
});
return [result, streamByIndex] as const;
return [result, cache] as const;
};
}

Expand Down
1 change: 1 addition & 0 deletions src/handlers/tsonAsyncIterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const tsonAsyncIterator: TsonAsyncType<

case ITERATOR_VALUE: {
yield value[1];
break;
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/handlers/tsonPromise.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,19 @@ test("does not crash node when it receives a promise rejection", async () => {
assert(deferreds.size === 1);
});

await waitFor(() => {
assert(deferreds.size === 0);
});

expect(result).toMatchInlineSnapshot(`
{
"foo": Promise {},
}
`);

await expect(result.foo).rejects.toMatchInlineSnapshot(
const err = await waitError(result.foo);

expect(err).toMatchInlineSnapshot(
"[TsonPromiseRejectionError: Promise rejected]",
);
});
Expand Down
4 changes: 1 addition & 3 deletions src/handlers/tsonPromise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ export const tsonPromise: TsonAsyncType<MyPromise, SerializedPromiseValue> = {
const value = await opts.stream[Symbol.asyncIterator]().next();

if (value.done) {
throw new TsonPromiseRejectionError(
"Expected promise value, got done - was the stream interrupted?",
);
throw new Error("Expected promise value, got done");
}

const [status, result] = value.value;
Expand Down
71 changes: 7 additions & 64 deletions src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
createTsonAsync,
tsonPromise,
} from "./index.js";
import { expectError, waitError } from "./internals/testUtils.js";
import { expectError, waitError, waitFor } from "./internals/testUtils.js";
import { TsonSerialized } from "./types.js";

test("multiple handlers for primitive string found", () => {
Expand Down Expand Up @@ -168,73 +168,16 @@ test("async: bad values", async () => {
}

const onErrorSpy = vitest.fn();
const value = await createTsonAsync({
await createTsonAsync({
onStreamError: onErrorSpy,
types: [tsonPromise],
}).parse(generator());

const typedValue = value as {
foo: Promise<unknown>;
};
await waitFor(() => {
expect(onErrorSpy).toHaveBeenCalledTimes(1);
});

expect(typedValue.foo).toBeInstanceOf(Promise);
await expect(typedValue.foo).rejects.toMatchInlineSnapshot(
"[TsonPromiseRejectionError: Expected promise value, got done - was the stream interrupted?]",
expect(onErrorSpy.mock.calls[0][0]).toMatchInlineSnapshot(
"[TsonError: Stream interrupted: Stream ended with 1 pending promises]",
);
});

test.todo("async: chunked response values", async () => {
async function* generator() {
await Promise.resolve();

yield "[" + "\n";

{
const obj = {
json: {
foo: ["Promise", 0, "__tson"],
},
nonce: "__tson",
} as TsonSerialized<any>;

const out = JSON.stringify(obj) + "\n";

const half = Math.ceil(out.length / 2);

yield out.slice(0, half);
yield out.slice(half);
}

await Promise.resolve();

yield " ," + "\n";
yield " [" + "\n";

// this value is chunked in half
{
const value = ' [0, [0, "bar"]]' + "\n";
const half = Math.ceil(value.length / 2);
yield value.slice(0, half);
yield value.slice(half);
}

yield ' [0, [0, "bar"]]' + "\n";
yield " ]" + "\n";
yield "]";
}

const onErrorSpy = vitest.fn();
const value = await createTsonAsync({
onStreamError: onErrorSpy,
types: [tsonPromise],
}).parse(generator());

const typedValue = value as {
foo: Promise<unknown>;
};

expect(typedValue.foo).toBeInstanceOf(Promise);
expect(await typedValue.foo).toBe("bar");

expect(onErrorSpy).toHaveBeenCalledTimes(0);
});

0 comments on commit 3c4c867

Please sign in to comment.