Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
serialization kinda works
Browse files Browse the repository at this point in the history
  • Loading branch information
KATT committed Oct 5, 2023
1 parent f6df800 commit 18833c1
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 40 deletions.
113 changes: 112 additions & 1 deletion src/async/serializeAsync.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect, test } from "vitest";

import { tsonPromise } from "../index.js";
import { tsonAsyncIterator, tsonPromise } from "../index.js";
import { createAsyncTsonSerialize } from "./serializeAsync.js";

test("serialize promise", async () => {
Expand Down Expand Up @@ -41,3 +41,114 @@ test("serialize promise", async () => {
]
`);
});

test("serialize 2 promises", async () => {
const serialize = createAsyncTsonSerialize({
nonce: () => "__tson",
types: [tsonPromise],
});

const promise = [Promise.resolve(42), Promise.resolve(43)];

const [head, iterator] = serialize(promise);

expect(head).toMatchInlineSnapshot(`
{
"json": [
[
"Promise",
0,
"__tson",
],
[
"Promise",
1,
"__tson",
],
],
"nonce": "__tson",
}
`);

const values = [];
for await (const value of iterator) {
values.push(value);
}

expect(values.length).toBe(2);
expect(values).toMatchInlineSnapshot(`
[
[
0,
[
0,
42,
],
],
[
1,
[
0,
43,
],
],
]
`);
});

test("serialize async iterable", async () => {
const serialize = createAsyncTsonSerialize({
nonce: () => "__tson",
types: [tsonAsyncIterator],
});

async function* iterable() {
await new Promise((resolve) => setTimeout(resolve, 1));
yield 42;
await new Promise((resolve) => setTimeout(resolve, 1));
yield 43;
}

const [head, iterator] = serialize(iterable());

expect(head).toMatchInlineSnapshot(`
{
"json": [
"AsyncIterator",
0,
"__tson",
],
"nonce": "__tson",
}
`);

const values = [];
for await (const value of iterator) {
values.push(value);
}

expect(values).toMatchInlineSnapshot(`
[
[
0,
[
0,
42,
],
],
[
0,
[
0,
43,
],
],
[
0,
[
2,
],
],
]
`);
});
69 changes: 30 additions & 39 deletions src/async/serializeAsync.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
TsonCircularReferenceError,
TsonPromiseRejectionError,
} from "../errors.js";
import { TsonCircularReferenceError } from "../errors.js";
import { assert } from "../internals/assert.js";
import { getNonce } from "../internals/getNonce.js";
import { mapOrReturn } from "../internals/mapOrReturn.js";
Expand All @@ -15,25 +12,23 @@ import {
TsonTypeTesterCustom,
TsonTypeTesterPrimitive,
} from "../types.js";
import { TsonAsyncOptions } from "./asyncTypes.js";
import { TsonAsyncIndex } from "./asyncTypes.js";
import { TsonAsyncStringifier } from "./asyncTypes.js";
import {
TsonAsyncIndex,
TsonAsyncOptions,
TsonAsyncStringifier,
} from "./asyncTypes.js";

type WalkFn = (value: unknown) => unknown;

export const PROMISE_RESOLVED = 0 as const;
const PROMISE_REJECTED = 1 as const;

type TsonAsyncValueTuple = [TsonAsyncIndex, unknown];

function walkerFactory(nonce: TsonNonce, types: TsonAsyncOptions["types"]) {
// instance variables
let asyncIndex = 0;
const promises = new Map<TsonAsyncIndex, Promise<TsonAsyncValueTuple>>();
const seen = new WeakSet();
const cache = new WeakMap<object, unknown>();

const iterators = new Map<TsonAsyncIndex, AsyncIterable<unknown>>();
const iterators = new Map<TsonAsyncIndex, AsyncIterator<unknown>>();

const iterator = {
async *[Symbol.asyncIterator]() {
Expand All @@ -42,53 +37,49 @@ function walkerFactory(nonce: TsonNonce, types: TsonAsyncOptions["types"]) {

// when all iterators are done, we're done

const cursors = new Map<TsonAsyncIndex, AsyncIterator<unknown>>();
const nextAsyncIteratorValue = new Map<
TsonAsyncIndex,
Promise<[TsonAsyncIndex, IteratorResult<unknown>]>
>();

let _tmp = 0;
while (iterators.size > 0) {

do {
if (_tmp++ > 10) {
throw new Error("too many iterations");
}

// set next cursor
for (const [idx, iterator] of iterators) {
// initialize cursor
if (cursors.has(idx)) {
continue;
if (!nextAsyncIteratorValue.has(idx)) {
nextAsyncIteratorValue.set(
idx,
iterator.next().then((result) => [idx, result]),
);
}

const cursor = iterator[Symbol.asyncIterator]();
// ^?
cursors.set(idx, cursor);
}

// it's a race!
for (const [idx, cursor] of cursors) {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!cursor.next) {
cursors.delete(idx);
iterators.delete(idx);
}
}

const nextValues = Array.from(cursors.entries()).map(
async ([idx, cursor]) => {
const result = await cursor.next();
return [idx, result] as const;
},
);
const nextValues = Array.from(nextAsyncIteratorValue.values());

const [idx, result] = await Promise.race(nextValues);

if (result.done) {
cursors.delete(idx);
nextAsyncIteratorValue.delete(idx);
iterators.delete(idx);
continue;
} else {
const iterator = iterators.get(idx);

assert(iterator, `iterator ${idx} not found`);
nextAsyncIteratorValue.set(
idx,
iterator.next().then((result) => [idx, result]),
);
}

const valueTuple: TsonAsyncValueTuple = [idx, result.value];
yield valueTuple;
}
} while (iterators.size > 0);
},
};

Expand All @@ -108,7 +99,7 @@ function walkerFactory(nonce: TsonNonce, types: TsonAsyncOptions["types"]) {
// abortSignal: new AbortSignal(),
value,
});
iterators.set(idx, iterator);
iterators.set(idx, iterator[Symbol.asyncIterator]());

return [handler.key as TsonTypeHandlerKey, idx, nonce];
}
Expand Down

0 comments on commit 18833c1

Please sign in to comment.