Skip to content

Commit

Permalink
fix(sqs): ensure sqs meta driven retries prevent infiloops
Browse files Browse the repository at this point in the history
  • Loading branch information
uladkasach committed Aug 19, 2024
1 parent 62dca67 commit 34fb491
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
32 changes: 32 additions & 0 deletions src/logic/withAsyncTaskExecutionLifecycleExecute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,38 @@ describe('withAsyncTaskExecutionLifecycleExecute', () => {
);
},
);

when(
'asked to execute on a task which is already in ATTEMPTED mode, with sqs metadata, with input.meta.requeueDepth of 7',
() => {
const promiseTask = daoAsyncTaskEnrichProduct.upsert({
task: new AsyncTaskEnrichProduct({
productUuid: uuid(),
status: AsyncTaskStatus.ATTEMPTED,
}),
});

then(
'it should throw an error to prevent infiloop due to requeue depth',
async () => {
const error = await getError(
execute({
task: await promiseTask,
meta: {
queueUrl: '__queue_url__',
enqueueUuid: uuid(),
queueType: 'SQS',
requeueDepth: 7,
},
}),
);
expect(error.message).toContain(
'attempted to retry a task more than limit times. blocked this attempt to prevent infiloop',
);
},
);
},
);
});

given(
Expand Down
17 changes: 16 additions & 1 deletion src/logic/withAsyncTaskExecutionLifecycleExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,24 @@ export const withAsyncTaskExecutionLifecycleExecute = <
},
);

// increment the requeue depth and prevent infiloops
const requeueDepthLimit = 3; // todo: pull from sqs subscription config
const requeueDepthAfter = (input.meta!.requeueDepth ?? 0) + 1;
if (requeueDepthAfter > requeueDepthLimit)
throw new UnexpectedCodePathError(
'attempted to retry a task more than limit times. blocked this attempt to prevent infiloop',
{ requeueDepthLimit, requeueDepthAfter, input },
);

// requeue the task
await sqs.sendMessage({
messageBody: JSON.stringify({ task: input.task, meta: input.meta }),
messageBody: JSON.stringify({
task: input.task,
meta: {
...input.meta,
requeueDepth: requeueDepthAfter,
},
}),
queueUrl,
delaySeconds: attemptTimeoutSeconds,
});
Expand Down

0 comments on commit 34fb491

Please sign in to comment.