Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
actually works
Browse files Browse the repository at this point in the history
  • Loading branch information
KATT committed Oct 5, 2023
1 parent 93bcc43 commit eda62c9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 49 deletions.
22 changes: 11 additions & 11 deletions src/async/deserializeAsync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ test("stringify async iterable + promise", async () => {
yield 1n;
await new Promise((resolve) => setTimeout(resolve, 1));
yield 2n;
await new Promise((resolve) => setTimeout(resolve, 30));
yield 3n;

await new Promise((resolve) => setTimeout(resolve, 1));
await new Promise((resolve) => setTimeout(resolve, 2));
yield 4n;
yield 5n;
}

const input = {
Expand All @@ -87,23 +87,22 @@ test("stringify async iterable + promise", async () => {
result.push(value);
}

expect(result).toEqual([1n, 2n, 3n, 4n]);
expect(result).toEqual([1n, 2n, 3n, 4n, 5n]);
});

test("e2e: stringify and parse promise with a promise over a network connection", async () => {
function createMockObj() {
async function* generator() {
await new Promise((resolve) => setTimeout(resolve, 1));
yield 1n;

await new Promise((resolve) => setTimeout(resolve, 1));
yield 2n;
for (const number of [1, 2, 3, 4, 5]) {
await new Promise((resolve) => setTimeout(resolve, 1));
yield BigInt(number);
}
}

return {
foo: "bar",
iterable: generator(),
promise: Promise.resolve(42),
// promise: Promise.resolve(42),
};
}

Expand Down Expand Up @@ -156,15 +155,16 @@ test("e2e: stringify and parse promise with a promise over a network connection"
const parsedRaw = await tson.parse(stringIterator);
const parsed = parsedRaw as MockObj;

expect(await parsed.promise).toEqual(42);
expect(parsed.foo).toEqual("bar");
// expect(await parsed.promise).toEqual(42);

const results = [];

for await (const value of parsed.iterable) {
results.push(value);
}

expect(results).toEqual([1n, 2n]);
expect(results).toEqual([1n, 2n, 3n, 4n, 5n]);

server.close();
});
70 changes: 33 additions & 37 deletions src/async/deserializeAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
}

return async (iterator: AsyncIterable<string>) => {
const deferreds = new Map<TsonAsyncIndex, Deferred<unknown>>();
// this is an awful hack to get around making a some sort of pipeline
const cache = new Map<
TsonAsyncIndex,
{
next: Deferred<unknown>;
values: unknown[];
}
>();
const instance = iterator[Symbol.asyncIterator]();

const walker: WalkerFactory = (nonce) => {
Expand All @@ -87,37 +94,36 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {

const idx = serializedValue as TsonAsyncIndex;

deferreds.set(idx, createSafeDeferred());
// console.log("creating deferred for", idx, "with value", walkedValue);
const self = {
next: createSafeDeferred(),
values: [],
};
cache.set(idx, self);

return transformer.deserialize({
// abortSignal
onDone() {
deferreds.delete(idx);
cache.delete(idx);
},
stream: {
[Symbol.asyncIterator]: () => {
// console.log("checking next", idx);
let index = 0;
return {
next: async () => {
const def = deferreds.get(idx);

if (def) {
// console.log("waiting for deferred", idx, def.promise);

const value = await def.promise;

deferreds.set(idx, createSafeDeferred());
const idx = index++;

if (self.values.length > idx) {
return {
done: false,
value,
value: self.values[idx],
};
}

await self.next.promise;

return {
done: true,
value: undefined,
done: false,
value: self.values[idx],
};
},
};
Expand Down Expand Up @@ -150,53 +156,43 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
return;
}

// console.log("got something that looks like a value", str);

const [index, result] = JSON.parse(str) as TsonAsyncValueTuple;

const deferred = deferreds.get(index);
// console.log("got deferred", index, deferred);
// console.log("got value", index, status, result, deferred);
const item = cache.get(index);

const walkedResult = walk(result);

assert(deferred, `No deferred found for index ${index}`);
assert(item, `No deferred found for index ${index}`);

// resolving deferred
deferred.resolve(walkedResult);

deferreds.delete(index);
item.values.push(walkedResult);
item.next.resolve(walkedResult);
item.next = createSafeDeferred();
}

buffer.forEach(readLine);

let nextValue = await instance.next();

while (!nextValue.done) {
// console.log("got next value", nextValue);
nextValue.value.split("\n").forEach(readLine);

nextValue = await instance.next();
}

assert(
!deferreds.size,
`Stream ended with ${deferreds.size} pending promises`,
);
assert(!cache.size, `Stream ended with ${cache.size} pending promises`);
}

async function init() {
const lines: string[] = [];

// get the head of the JSON

// console.log("getting head of JSON");
let lastResult: IteratorResult<string>;
do {
lastResult = await instance.next();

lines.push(...(lastResult.value as string).split("\n").filter(Boolean));

// console.log("got line", lines);
} while (lines.length < 2);

const [
Expand Down Expand Up @@ -232,11 +228,11 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
);

// cancel all pending promises
for (const deferred of deferreds.values()) {
deferred.reject(err);
for (const deferred of cache.values()) {
deferred.next.reject(err);
}

deferreds.clear();
cache.clear();

opts.onStreamError?.(err);
});
Expand All @@ -246,7 +242,7 @@ export function createTsonParseAsyncInner(opts: TsonAsyncOptions) {
const result = await init().catch((cause: unknown) => {
throw new TsonError("Failed to initialize TSON stream", { cause });
});
return [result, deferreds] as const;
return [result, cache] as const;
};
}

Expand Down
2 changes: 1 addition & 1 deletion src/handlers/tsonPromise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ export const tsonPromise: TsonAsyncType<MyPromise, SerializedPromiseValue> = {
.then((value): SerializedPromiseValue => [PROMISE_RESOLVED, value])
.catch((err): SerializedPromiseValue => [PROMISE_REJECTED, err]);
return (async function* generator() {
// console.log("serializing", opts.value);
yield await value;
})();
},
Expand Down Expand Up @@ -100,6 +99,7 @@ export const tsonAsyncIterator: TsonAsyncType<
}
}
} finally {
// `onDone` is a hack and shouldn't be needed
opts.onDone();
}
})();
Expand Down
1 change: 1 addition & 0 deletions src/internals/iterableUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export async function* readableStreamToAsyncIterable<T>(
while (true) {
// Read from the stream
const result = await reader.read();

// Exit if we're done
if (result.done) {
return;
Expand Down

0 comments on commit eda62c9

Please sign in to comment.