Skip to content

Commit

Permalink
fix: fix incorrect error messages for misbehaving subscription
Browse files Browse the repository at this point in the history
Also throw such messages outside instead of silent retry if `onError` is not specified

Closes #1
  • Loading branch information
aikoven committed Sep 5, 2022
1 parent 6db204b commit c720662
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 4 deletions.
67 changes: 67 additions & 0 deletions src/retryCollectionSubscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,70 @@ test('backoff', async () => {
]
`);
});

test('undefined value in initial emission', async () => {
type Updates = Array<CollectionSubscriptionUpdate<string, {test: string}>>;

const sink = new AsyncSink<Updates>();

const it = retryCollectionSubscription(() => sink)[Symbol.asyncIterator]();

sink.write([
{key: '1', value: {test: '1'}},
{key: '2', value: undefined},
]);

await expect(it.next()).rejects.toMatchInlineSnapshot(
`[Error: Misbehaving subscription source: unexpected 'undefined' value at key '2' in initial emission]`,
);
});

test('undefined value in initial emission after retry', async () => {
type Updates = Array<CollectionSubscriptionUpdate<string, {test: string}>>;

const sink1 = new AsyncSink<Updates>();
const sink2 = new AsyncSink<Updates>();

const subscribe = jest
.fn<AsyncIterable<Updates>, []>()
.mockImplementationOnce(() => sink1)
.mockImplementationOnce(() => sink2);

const it = retryCollectionSubscription(subscribe)[Symbol.asyncIterator]();

sink1.write([]);
sink1.error(new Error('test-error'));

sink2.write([{key: '1', value: undefined}]);

await expect(it.next()).resolves.toMatchInlineSnapshot(`
Object {
"done": false,
"value": Array [],
}
`);
await expect(it.next()).rejects.toMatchInlineSnapshot(
`[Error: Misbehaving subscription source: unexpected 'undefined' value at key '1' in initial emission]`,
);
});

test('undefined value in for key not present in the state', async () => {
type Updates = Array<CollectionSubscriptionUpdate<string, {test: string}>>;

const sink = new AsyncSink<Updates>();

const it = retryCollectionSubscription(() => sink)[Symbol.asyncIterator]();

sink.write([]);
sink.write([{key: '1', value: undefined}]);

await expect(it.next()).resolves.toMatchInlineSnapshot(`
Object {
"done": false,
"value": Array [],
}
`);
await expect(it.next()).rejects.toMatchInlineSnapshot(
`[Error: Misbehaving subscription source: unexpected 'undefined' value at key '1' which was not present in the state]`,
);
});
36 changes: 32 additions & 4 deletions src/retryCollectionSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ export async function* retryCollectionSubscription<

for (const {key, value} of updates) {
if (value === undefined) {
throw new Error(
`Unexpected 'undefined' value at key '${key}' in initial emission`,
throw markErrorAsInternal(
new Error(
`Misbehaving subscription source: unexpected 'undefined' value at key '${key}' in initial emission`,
),
);
}

Expand All @@ -146,11 +148,21 @@ export async function* retryCollectionSubscription<
} else {
for (const {key, value} of updates) {
if (value === undefined) {
if (innerCount === 0) {
throw markErrorAsInternal(
new Error(
`Misbehaving subscription source: unexpected 'undefined' value at key '${key}' in initial emission`,
),
);
}

const deleted = state.delete(key);

if (!deleted) {
throw new Error(
`Unexpected 'undefined' value at key '${key}': previous value was already 'undefined'`,
throw markErrorAsInternal(
new Error(
`Misbehaving subscription source: unexpected 'undefined' value at key '${key}' which was not present in the state`,
),
);
}
} else {
Expand All @@ -166,6 +178,10 @@ export async function* retryCollectionSubscription<
} catch (error) {
rethrowAbortError(error);

if (onError == null && isInternalError(error)) {
throw error;
}

if (attempt === undefined) {
onError?.(error, undefined, undefined);

Expand All @@ -191,3 +207,15 @@ export async function* retryCollectionSubscription<
}
}
}

const internalErrors = new WeakSet<Error>();

function markErrorAsInternal(error: Error) {
internalErrors.add(error);

return error;
}

function isInternalError(error: unknown) {
return error instanceof Error && internalErrors.has(error);
}

0 comments on commit c720662

Please sign in to comment.