From e03def6d2ce5d53607fb9d693a34ea81ad87851f Mon Sep 17 00:00:00 2001 From: Ulad Kasach Date: Sat, 6 Jul 2024 10:09:00 -0400 Subject: [PATCH] feat(requeue): add native sqs requeue capability to prevent error logs on retryLater --- package-lock.json | 52 +++++++++++- package.json | 2 +- src/domain/constants.ts | 15 +++- src/index.ts | 2 +- ...nt.ts => extractTaskParcelFromSqsEvent.ts} | 30 +++++-- .../withAsyncTaskExecutionLifecycleEnqueue.ts | 72 ++++++++++++++--- .../withAsyncTaskExecutionLifecycleExecute.ts | 79 +++++++++++++++---- 7 files changed, 213 insertions(+), 39 deletions(-) rename src/logic/{extractTaskFromSqsEvent.ts => extractTaskParcelFromSqsEvent.ts} (50%) diff --git a/package-lock.json b/package-lock.json index e14b24f..2322798 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,7 +8,7 @@ "name": "simple-async-tasks", "version": "1.4.4", "dependencies": { - "@ehmpathy/error-fns": "1.0.2", + "@ehmpathy/error-fns": "1.3.0", "date-fns": "2.30.0", "simple-in-memory-queue": "1.1.7", "uuid": "9.0.0" @@ -1081,9 +1081,9 @@ } }, "node_modules/@ehmpathy/error-fns": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/@ehmpathy/error-fns/-/error-fns-1.0.2.tgz", - "integrity": "sha512-v3aJIqUvD9a3drx1pyS8La+9u9WTTvNE35NksiD4Oo3VanNe8Rmue/atRHPg4nNYQ/xPv4+RoqC+OBj6cAY8VA==", + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/@ehmpathy/error-fns/-/error-fns-1.3.0.tgz", + "integrity": "sha512-qkfbevoy8NP8u1ohj7iMlQMgMK9XFUV6HEkkVjIOikbcYWIkxteBZF+nuu6vwpzyoXi9E4ILuv9AJUl/Sy2j5Q==", "hasInstallScript": true, "dependencies": { "type-fns": "0.9.0" @@ -18662,6 +18662,28 @@ "node": ">=8.0.0" } }, + "node_modules/test-fns/node_modules/@ehmpathy/error-fns": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@ehmpathy/error-fns/-/error-fns-1.0.2.tgz", + "integrity": "sha512-v3aJIqUvD9a3drx1pyS8La+9u9WTTvNE35NksiD4Oo3VanNe8Rmue/atRHPg4nNYQ/xPv4+RoqC+OBj6cAY8VA==", + "dev": true, + "hasInstallScript": true, + "dependencies": { + "type-fns": "0.9.0" + }, + "engines": { + "node": ">=8.0.0" + } + }, + "node_modules/test-fns/node_modules/type-fns": { + "version": "0.9.0", + "resolved": "https://registry.npmjs.org/type-fns/-/type-fns-0.9.0.tgz", + "integrity": "sha512-ndhY4JBIbKix0LuGA5smh/XhFFnbeudnih++WxVoGTfdrITsZe/s3qje9GZNdWwsO+YWGyQkNXwAjnWyM/dipw==", + "dev": true, + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/text-extensions": { "version": "1.9.0", "resolved": "https://registry.npmjs.org/text-extensions/-/text-extensions-1.9.0.tgz", @@ -19295,6 +19317,28 @@ "node": ">=8.0.0" } }, + "node_modules/type-fns/node_modules/@ehmpathy/error-fns": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@ehmpathy/error-fns/-/error-fns-1.0.2.tgz", + "integrity": "sha512-v3aJIqUvD9a3drx1pyS8La+9u9WTTvNE35NksiD4Oo3VanNe8Rmue/atRHPg4nNYQ/xPv4+RoqC+OBj6cAY8VA==", + "hasInstallScript": true, + "peer": true, + "dependencies": { + "type-fns": "0.9.0" + }, + "engines": { + "node": ">=8.0.0" + } + }, + "node_modules/type-fns/node_modules/type-fns": { + "version": "0.9.0", + "resolved": "https://registry.npmjs.org/type-fns/-/type-fns-0.9.0.tgz", + "integrity": "sha512-ndhY4JBIbKix0LuGA5smh/XhFFnbeudnih++WxVoGTfdrITsZe/s3qje9GZNdWwsO+YWGyQkNXwAjnWyM/dipw==", + "peer": true, + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/typed-array-byte-offset": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/typed-array-byte-offset/-/typed-array-byte-offset-1.0.0.tgz", diff --git a/package.json b/package.json index afead88..f80c285 100644 --- a/package.json +++ b/package.json @@ -51,7 +51,7 @@ "postversion": "git push origin HEAD --tags --no-verify" }, "dependencies": { - "@ehmpathy/error-fns": "1.0.2", + "@ehmpathy/error-fns": "1.3.0", "date-fns": "2.30.0", "simple-in-memory-queue": "1.1.7", "uuid": "9.0.0" diff --git a/src/domain/constants.ts b/src/domain/constants.ts index 260f8c5..3b555ce 100644 --- a/src/domain/constants.ts +++ b/src/domain/constants.ts @@ -14,13 +14,24 @@ export type AsyncTaskDaoContext = Record | void; export interface AsyncTaskDao< T extends AsyncTask, U extends Partial, - M extends Partial, + M extends Partial & { status: AsyncTaskStatus }, C extends AsyncTaskDaoContext, > { findByMutex?: ( - input: M & { status: AsyncTaskStatus.ATTEMPTED }, // needs to be able to search by mutex keys + status + input: M & { status: AsyncTaskStatus }, // needs to be able to search by mutex keys + status context: C, ) => Promise[]>; findByUnique: (input: U, context: C) => Promise | null>; upsert: (input: { task: T }, context: C) => Promise>; } + +/** + * the shape of a simple aws sqs api + */ +export type SimpleAwsSqsApi = { + sendMessage: (input: { + queueUrl: string; + messageBody: string; + delaySeconds?: number; + }) => Promise; +}; diff --git a/src/index.ts b/src/index.ts index 8d1d275..7cb9474 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -export { extractTaskFromSqsEvent } from './logic/extractTaskFromSqsEvent'; +export { extractTaskParcelFromSqsEvent } from './logic/extractTaskParcelFromSqsEvent'; export { withAsyncTaskExecutionLifecycleEnqueue, SimpleAsyncTaskSqsQueueContract, diff --git a/src/logic/extractTaskFromSqsEvent.ts b/src/logic/extractTaskParcelFromSqsEvent.ts similarity index 50% rename from src/logic/extractTaskFromSqsEvent.ts rename to src/logic/extractTaskParcelFromSqsEvent.ts index 8deadb8..b0713e9 100644 --- a/src/logic/extractTaskFromSqsEvent.ts +++ b/src/logic/extractTaskParcelFromSqsEvent.ts @@ -1,10 +1,30 @@ import { UnexpectedCodePathError } from '@ehmpathy/error-fns'; import type { SQSEvent } from 'aws-lambda'; +import { HasMetadata } from 'type-fns'; + +import { SimpleAsyncTaskEnqueueMetadata } from './withAsyncTaskExecutionLifecycleEnqueue'; + +/** + * a queue parcel contains both the task and meta about how the task was enqueued + */ +export interface AsyncTaskQueueParcel> { + /** + * the task contained within the parcel + */ + task: HasMetadata; + + /** + * metadata about how this parcel was enqueued + */ + meta?: SimpleAsyncTaskEnqueueMetadata; +} /** - * method to extract an async task from an sqs event sent to an aws-lambda + * method to extract an async task parcel from an sqs event sent to an aws-lambda */ -export const extractTaskFromSqsEvent = (event: SQSEvent) => { +export const extractTaskParcelFromSqsEvent = >( + event: SQSEvent, +) => { // extract the body if (event.Records.length > 1) throw new UnexpectedCodePathError( @@ -19,9 +39,9 @@ export const extractTaskFromSqsEvent = (event: SQSEvent) => { // }); // if (requeued) return { delayed: true }; // stop here if we requeued the message due to sqs scheduling - // parse the body into task + // parse the body const message = event.Records[0]!.body; - const body = JSON.parse(message) as { task: T }; + const body = JSON.parse(message) as AsyncTaskQueueParcel; if (!body.task) throw new UnexpectedCodePathError( 'could not find task on sqs message body', @@ -29,5 +49,5 @@ export const extractTaskFromSqsEvent = (event: SQSEvent) => { body, }, ); - return body.task; + return { task: body.task, meta: body.meta }; }; diff --git a/src/logic/withAsyncTaskExecutionLifecycleEnqueue.ts b/src/logic/withAsyncTaskExecutionLifecycleEnqueue.ts index 16eefc4..ee3ab1b 100644 --- a/src/logic/withAsyncTaskExecutionLifecycleEnqueue.ts +++ b/src/logic/withAsyncTaskExecutionLifecycleEnqueue.ts @@ -2,7 +2,12 @@ import { UnexpectedCodePathError } from '@ehmpathy/error-fns'; import type { LogMethods } from 'simple-leveled-log-methods'; import { HasMetadata, isAFunction } from 'type-fns'; -import { AsyncTaskDao, AsyncTaskDaoContext } from '../domain/constants'; +import { uuid } from '../deps'; +import { + AsyncTaskDao, + AsyncTaskDaoContext, + SimpleAwsSqsApi, +} from '../domain/constants'; import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask'; /** @@ -14,15 +19,49 @@ import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask'; */ export type SimpleAsyncTaskSqsQueueContract = { type: 'SQS'; - api: { - sendMessage: (input: { - queueUrl: string; - messageBody: string; - }) => Promise; - }; + api: SimpleAwsSqsApi; url: string | (() => Promise); }; +/** + * simple-async-task metadata embedded in the queue message + */ +export type SimpleAsyncTaskSqsEnqueueMetadata = { + /** + * the type of queue this task was enqueued to + * + * usecase + * - type narrow + */ + queueType: 'SQS'; + + /** + * the queue that the message was originally enqueued to + */ + queueUrl: string; + + /** + * a uuid assigned to this message upon enqueue + * + * usecase + * - traceability + */ + enqueueUuid: string; + + /** + * the number of times this sqs.message was requeued, post enqueue + * + * usecase + * - infiniloop prevention + */ + requeueDepth: number; +}; + +/** + * the different types of metadata that are available enqueue + */ +export type SimpleAsyncTaskEnqueueMetadata = SimpleAsyncTaskSqsEnqueueMetadata; + /** * a simple, generic, contract for async-tasks queued via any queue * @@ -56,7 +95,7 @@ export type SimpleAsyncTaskAnyQueueContract = { export const withAsyncTaskExecutionLifecycleEnqueue = < T extends AsyncTask, U extends Partial, - M extends Partial, + M extends Partial & { status: AsyncTaskStatus }, C extends AsyncTaskDaoContext, I extends U, >({ @@ -131,11 +170,22 @@ export const withAsyncTaskExecutionLifecycleEnqueue = < }); await (async () => { // support sqs queues natively - if (queue.type === 'SQS') + if (queue.type === 'SQS') { + const queueUrl = isAFunction(queue.url) ? await queue.url() : queue.url; + const meta: SimpleAsyncTaskSqsEnqueueMetadata = { + queueType: 'SQS', + queueUrl, + enqueueUuid: uuid(), + requeueDepth: 0, + }; return await queue.api.sendMessage({ - queueUrl: isAFunction(queue.url) ? await queue.url() : queue.url, - messageBody: JSON.stringify({ task: taskToQueue }), + queueUrl, + messageBody: JSON.stringify({ + task: taskToQueue, + meta, + }), }); + } // otherwise, assume it has a generic queue contract if (queue.push) return await queue.push(taskToQueue); diff --git a/src/logic/withAsyncTaskExecutionLifecycleExecute.ts b/src/logic/withAsyncTaskExecutionLifecycleExecute.ts index 4dba5bc..72f1e44 100644 --- a/src/logic/withAsyncTaskExecutionLifecycleExecute.ts +++ b/src/logic/withAsyncTaskExecutionLifecycleExecute.ts @@ -3,12 +3,17 @@ import { HelpfulError, UnexpectedCodePathError, } from '@ehmpathy/error-fns'; +import { HelpfulErrorMetadata } from '@ehmpathy/error-fns/dist/HelpfulError'; import { addSeconds, isBefore, parseISO } from 'date-fns'; import type { LogMethods } from 'simple-leveled-log-methods'; -import { HasMetadata } from 'type-fns'; -import { AsyncTaskDao, AsyncTaskDaoContext } from '../domain/constants'; +import { + AsyncTaskDao, + AsyncTaskDaoContext, + SimpleAwsSqsApi, +} from '../domain/constants'; import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask'; +import { AsyncTaskQueueParcel } from './extractTaskParcelFromSqsEvent'; export class SimpleAsyncTaskRetryLaterError extends HelpfulError { constructor(reason: string, metadata?: Record) { @@ -44,21 +49,23 @@ export class SimpleAsyncTaskRetryLaterError extends HelpfulError { export const withAsyncTaskExecutionLifecycleExecute = < T extends AsyncTask, U extends Partial, - M extends Partial, - I extends { - task: T; - }, + M extends Partial & { status: AsyncTaskStatus }, + I extends AsyncTaskQueueParcel, C extends AsyncTaskDaoContext, O extends Record, >( - logic: (input: I & { task: HasMetadata }, context: C) => O | Promise, + logic: (input: I & AsyncTaskQueueParcel, context: C) => O | Promise, { dao, log, + api, options, }: { dao: AsyncTaskDao; log: LogMethods; + api?: { + sqs?: SimpleAwsSqsApi; + }; options?: { attempt?: { /** @@ -70,11 +77,14 @@ export const withAsyncTaskExecutionLifecycleExecute = < }; }; }, -): ((input: I, context: C) => Promise<(O & { task: T }) | { task: T }>) => { +): (( + input: I, + context: C, +) => Promise<(O & { task: T }) | (Partial & { task: T })>) => { return async ( input: I, context: C, - ): Promise<(O & { task: T }) | { task: T }> => { + ): Promise<(O & { task: T }) | (Partial & { task: T })> => { // try to find the task by unique; it must be defined in db by now const foundTask = await dao.findByUnique( { @@ -87,11 +97,49 @@ export const withAsyncTaskExecutionLifecycleExecute = < `task not found by unique: '${JSON.stringify(input.task)}'`, ); + // define the timeout in seconds; this is how long each attempt could take, max + const attemptTimeoutSeconds = options?.attempt?.timeout.seconds ?? 15 * 60; // default to 15min, a conservative estimate + + // define how to retry later + const retryLater = (() => { + // if this is an sqs driven task and the meta is available, requeue and resolve success + if (input.meta?.queueType === 'SQS') { + const queueUrl = input.meta.queueUrl; + return async (message: string, metadata: HelpfulErrorMetadata) => { + // log what we're up to + log.debug(`executeTask.progress: requeueing task to retry later`, { + message, + metadata, + }); + + // get the sqs api + const sqs = + api?.sqs ?? + UnexpectedCodePathError.throw( + 'executeTask.api.sqs was not declared, yet task queue.type is sqs and found metadata', + { + meta: input.meta, + api, + }, + ); + + // requeue the task + await sqs.sendMessage({ + messageBody: JSON.stringify({ task: input.task, meta: input.meta }), + queueUrl, + delaySeconds: attemptTimeoutSeconds, + }); + }; + } + + // otherwise, just throw the error; that's the base case + return async (message: string, metadata: HelpfulErrorMetadata) => + SimpleAsyncTaskRetryLaterError.throw(message, metadata); + })(); + // check that the task is not already being attempted if (foundTask.status === AsyncTaskStatus.ATTEMPTED) { // if the task was updated less than 15 minutes ago, then it may still be being attempted, so throw an error so this message will get retried eventually - const attemptTimeoutSeconds = - options?.attempt?.timeout.seconds ?? 15 * 60; const updatedAtLast = typeof foundTask.updatedAt === 'string' ? parseISO(foundTask.updatedAt) @@ -107,7 +155,7 @@ export const withAsyncTaskExecutionLifecycleExecute = < ); const now = new Date(); if (isBefore(now, attemptTimeoutAt)) - throw new SimpleAsyncTaskRetryLaterError( + retryLater( 'this task may still be being attempted by a different invocation, last attempt started less than the timeout', { attemptTimeoutSeconds, @@ -117,19 +165,20 @@ export const withAsyncTaskExecutionLifecycleExecute = < } // check that the task has not already been fulfilled and is not canceled; if either are true, warn and exit + const emptyResult: Partial = {}; if (foundTask.status === AsyncTaskStatus.FULFILLED) { log.warn( 'executeTask.progress: attempted to execute a task that has already been fulfilled. skipping', { task: foundTask }, ); - return { task: foundTask }; + return { ...emptyResult, task: foundTask }; } if (foundTask.status === AsyncTaskStatus.CANCELED) { log.warn( 'executeTask.progress: attempted to execute a task that has already been canceled. skipping', { task: foundTask }, ); - return { task: foundTask }; + return { ...emptyResult, task: foundTask }; } // determine whether we need to check for mutually exclusive tasks @@ -148,7 +197,7 @@ export const withAsyncTaskExecutionLifecycleExecute = < context, ); if (mutexActiveTasks.length) - throw new SimpleAsyncTaskRetryLaterError( + retryLater( `this task's mutex lock is reserved by at least one other task currently being attempted by a different invocation`, { mutexKeys,