diff --git a/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts b/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts index 7975b35..f1ebc26 100644 --- a/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts +++ b/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts @@ -135,6 +135,38 @@ describe('withAsyncTaskExecutionLifecycleExecute', () => { ); }, ); + + when( + 'asked to execute on a task which is already in ATTEMPTED mode, with sqs metadata, with input.meta.requeueDepth of 7', + () => { + const promiseTask = daoAsyncTaskEnrichProduct.upsert({ + task: new AsyncTaskEnrichProduct({ + productUuid: uuid(), + status: AsyncTaskStatus.ATTEMPTED, + }), + }); + + then( + 'it should throw an error to prevent infiloop due to requeue depth', + async () => { + const error = await getError( + execute({ + task: await promiseTask, + meta: { + queueUrl: '__queue_url__', + enqueueUuid: uuid(), + queueType: 'SQS', + requeueDepth: 7, + }, + }), + ); + expect(error.message).toContain( + 'attempted to retry a task more than limit times. blocked this attempt to prevent infiloop', + ); + }, + ); + }, + ); }); given( diff --git a/src/logic/withAsyncTaskExecutionLifecycleExecute.ts b/src/logic/withAsyncTaskExecutionLifecycleExecute.ts index 0960d04..19dc34b 100644 --- a/src/logic/withAsyncTaskExecutionLifecycleExecute.ts +++ b/src/logic/withAsyncTaskExecutionLifecycleExecute.ts @@ -135,9 +135,24 @@ export const withAsyncTaskExecutionLifecycleExecute = < }, ); + // increment the requeue depth and prevent infiloops + const requeueDepthLimit = 3; // todo: pull from sqs subscription config + const requeueDepthAfter = (input.meta!.requeueDepth ?? 0) + 1; + if (requeueDepthAfter > requeueDepthLimit) + throw new UnexpectedCodePathError( + 'attempted to retry a task more than limit times. blocked this attempt to prevent infiloop', + { requeueDepthLimit, requeueDepthAfter, input }, + ); + // requeue the task await sqs.sendMessage({ - messageBody: JSON.stringify({ task: input.task, meta: input.meta }), + messageBody: JSON.stringify({ + task: input.task, + meta: { + ...input.meta, + requeueDepth: requeueDepthAfter, + }, + }), queueUrl, delaySeconds: attemptTimeoutSeconds, });