From 34fb4913e5eae4053e269bf803d8ee749eb4281a Mon Sep 17 00:00:00 2001 From: Ulad Kasach Date: Mon, 19 Aug 2024 03:17:02 -0400 Subject: [PATCH] fix(sqs): ensure sqs meta driven retries prevent infiloops --- ...AsyncTaskExecutionLifecycleExecute.test.ts | 32 +++++++++++++++++++ .../withAsyncTaskExecutionLifecycleExecute.ts | 17 +++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts b/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts index 7975b35..f1ebc26 100644 --- a/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts +++ b/src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts @@ -135,6 +135,38 @@ describe('withAsyncTaskExecutionLifecycleExecute', () => { ); }, ); + + when( + 'asked to execute on a task which is already in ATTEMPTED mode, with sqs metadata, with input.meta.requeueDepth of 7', + () => { + const promiseTask = daoAsyncTaskEnrichProduct.upsert({ + task: new AsyncTaskEnrichProduct({ + productUuid: uuid(), + status: AsyncTaskStatus.ATTEMPTED, + }), + }); + + then( + 'it should throw an error to prevent infiloop due to requeue depth', + async () => { + const error = await getError( + execute({ + task: await promiseTask, + meta: { + queueUrl: '__queue_url__', + enqueueUuid: uuid(), + queueType: 'SQS', + requeueDepth: 7, + }, + }), + ); + expect(error.message).toContain( + 'attempted to retry a task more than limit times. blocked this attempt to prevent infiloop', + ); + }, + ); + }, + ); }); given( diff --git a/src/logic/withAsyncTaskExecutionLifecycleExecute.ts b/src/logic/withAsyncTaskExecutionLifecycleExecute.ts index 0960d04..19dc34b 100644 --- a/src/logic/withAsyncTaskExecutionLifecycleExecute.ts +++ b/src/logic/withAsyncTaskExecutionLifecycleExecute.ts @@ -135,9 +135,24 @@ export const withAsyncTaskExecutionLifecycleExecute = < }, ); + // increment the requeue depth and prevent infiloops + const requeueDepthLimit = 3; // todo: pull from sqs subscription config + const requeueDepthAfter = (input.meta!.requeueDepth ?? 0) + 1; + if (requeueDepthAfter > requeueDepthLimit) + throw new UnexpectedCodePathError( + 'attempted to retry a task more than limit times. blocked this attempt to prevent infiloop', + { requeueDepthLimit, requeueDepthAfter, input }, + ); + // requeue the task await sqs.sendMessage({ - messageBody: JSON.stringify({ task: input.task, meta: input.meta }), + messageBody: JSON.stringify({ + task: input.task, + meta: { + ...input.meta, + requeueDepth: requeueDepthAfter, + }, + }), queueUrl, delaySeconds: attemptTimeoutSeconds, });