Skip to content

Commit

Permalink
fix(retry): ensure to exit without attempt on retry later via requeue
Browse files Browse the repository at this point in the history
  • Loading branch information
uladkasach committed Aug 6, 2024
1 parent 2b8c832 commit 653ec01
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 2 deletions.
138 changes: 138 additions & 0 deletions src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { DomainEntity, getUniqueIdentifier, serialize } from 'domain-objects';
import { getError, given, then, when } from 'test-fns';
import { HasMetadata } from 'type-fns';

import { uuid } from '../deps';
import { AsyncTaskStatus } from '../domain/objects/AsyncTask';
import { withAsyncTaskExecutionLifecycleEnqueue } from './withAsyncTaskExecutionLifecycleEnqueue';
import { withAsyncTaskExecutionLifecycleExecute } from './withAsyncTaskExecutionLifecycleExecute';

describe('withAsyncTaskExecutionLifecycleExecute', () => {
given('a task class with an in memory dao and sqs driver', () => {
interface AsyncTaskEnrichProduct {
updatedAt?: Date;
productUuid: string;
status: AsyncTaskStatus;
}
class AsyncTaskEnrichProduct
extends DomainEntity<AsyncTaskEnrichProduct>
implements AsyncTaskEnrichProduct
{
public static unique = ['productUuid'] as const;
}
const database: Record<string, HasMetadata<AsyncTaskEnrichProduct>> = {};
const daoAsyncTaskEnrichProduct = {
upsert: jest.fn(
async (input: {
task: AsyncTaskEnrichProduct;
}): Promise<HasMetadata<AsyncTaskEnrichProduct>> => {
const withMetadata = new AsyncTaskEnrichProduct({
...input.task,
updatedAt: new Date(),
}) as HasMetadata<AsyncTaskEnrichProduct>;
database[serialize(getUniqueIdentifier(withMetadata))] = withMetadata;
return withMetadata;
},
),
findByUnique: jest.fn(
async (input: {
productUuid: string;
}): Promise<null | HasMetadata<AsyncTaskEnrichProduct>> =>
database[serialize({ productUuid: input.productUuid })] ?? null,
),
};
const executeInnerLogicMock = jest.fn();
const sqsSendMessageMock = jest.fn();
const execute = withAsyncTaskExecutionLifecycleExecute(
async ({ task }) => {
executeInnerLogicMock({ on: task });
return {
task: await daoAsyncTaskEnrichProduct.upsert({
task: { ...task, status: AsyncTaskStatus.FULFILLED },
}),
};
},
{
dao: daoAsyncTaskEnrichProduct,
log: console,
api: {
sqs: {
sendMessage: sqsSendMessageMock,
},
},
},
);

beforeEach(() => {
executeInnerLogicMock.mockReset();
sqsSendMessageMock.mockReset();
});

when('asked to execute on a queued task', () => {
const promiseTask = daoAsyncTaskEnrichProduct.upsert({
task: new AsyncTaskEnrichProduct({
productUuid: uuid(),
status: AsyncTaskStatus.QUEUED,
}),
});

then('it should successfully attempt to execute', async () => {
const result = await execute({ task: await promiseTask });
expect(executeInnerLogicMock).toHaveBeenCalledTimes(1);
expect(result.task.status).toEqual(AsyncTaskStatus.FULFILLED);
});
});

when(
'asked to execute on a task which is already in ATTEMPTED mode, without sqs metadata',
() => {
const promiseTask = daoAsyncTaskEnrichProduct.upsert({
task: new AsyncTaskEnrichProduct({
productUuid: uuid(),
status: AsyncTaskStatus.ATTEMPTED,
}),
});

then(
'it should throw an error, to trigger a retry later, without invoking the inner execution logic',
async () => {
const error = await getError(execute({ task: await promiseTask }));
expect(error.message).toContain(
'this error was thrown to ensure this task is retried later',
);
},
);
},
);

when(
'asked to execute on a task which is already in ATTEMPTED mode, with sqs metadata',
() => {
const promiseTask = daoAsyncTaskEnrichProduct.upsert({
task: new AsyncTaskEnrichProduct({
productUuid: uuid(),
status: AsyncTaskStatus.ATTEMPTED,
}),
});

then(
'it should sqs.sendMessage on the task, to try again later, without invoking the inner execution logic',
async () => {
const result = await execute({
task: await promiseTask,
meta: {
queueUrl: '__queue_url__',
enqueueUuid: uuid(),
queueType: 'SQS',
requeueDepth: 1,
},
});
console.log(result);
expect(sqsSendMessageMock).toHaveBeenCalledTimes(1);
expect(executeInnerLogicMock).toHaveBeenCalledTimes(0);
},
);
},
);
});
});
9 changes: 7 additions & 2 deletions src/logic/withAsyncTaskExecutionLifecycleExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ export const withAsyncTaskExecutionLifecycleExecute = <
queueUrl,
delaySeconds: attemptTimeoutSeconds,
});

// and return the retained state of the task
return {
task: foundTask,
} as any as Partial<O> & { task: T };
};
}

Expand All @@ -167,7 +172,7 @@ export const withAsyncTaskExecutionLifecycleExecute = <
);
const now = new Date();
if (isBefore(now, attemptTimeoutAt))
await retryLater(
return await retryLater(
'this task may still be being attempted by a different invocation, last attempt started less than the timeout',
{
attemptTimeoutSeconds,
Expand Down Expand Up @@ -209,7 +214,7 @@ export const withAsyncTaskExecutionLifecycleExecute = <
context,
);
if (mutexActiveTasks.length)
await retryLater(
return await retryLater(
`this task's mutex lock is reserved by at least one other task currently being attempted by a different invocation`,
{
mutexKeys,
Expand Down

0 comments on commit 653ec01

Please sign in to comment.