Skip to content

Commit

Permalink
fix(enqueue): re-enable enqueue to any queue, beyond sqs
Browse files Browse the repository at this point in the history
  • Loading branch information
uladkasach committed Apr 20, 2024
1 parent 010cb00 commit 495141c
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 17 deletions.
77 changes: 76 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"dependencies": {
"@ehmpathy/error-fns": "1.0.2",
"date-fns": "2.30.0",
"simple-in-memory-queue": "1.1.7",
"uuid": "9.0.0"
},
"peerDependencies": {
Expand Down Expand Up @@ -83,6 +84,7 @@
"jest": "29.3.1",
"prettier": "2.8.1",
"simple-leveled-log-methods": "0.3.0",
"test-fns": "1.3.0",
"ts-jest": "29.0.3",
"ts-node": "10.9.1",
"typescript": "4.9.4",
Expand Down
6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export { extractTaskFromSqsEvent } from './logic/extractTaskFromSqsEvent';
export { withAsyncTaskExecutionLifecycleQueue } from './logic/withAsyncTaskExecutionLifecycleQueue';
export {
withAsyncTaskExecutionLifecycleEnqueue,
SimpleAsyncTaskSqsQueueContract,
SimpleAsyncTaskAnyQueueContract,
} from './logic/withAsyncTaskExecutionLifecycleEnqueue';
export { withAsyncTaskExecutionLifecycleExecute } from './logic/withAsyncTaskExecutionLifecycleExecute';

export { AsyncTask, AsyncTaskStatus } from './domain/objects/AsyncTask';
Expand Down
90 changes: 90 additions & 0 deletions src/logic/withAsyncTaskExecutionLifecycleEnqueue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { QueueOrder, createQueue } from 'simple-in-memory-queue';
import { given, when, then } from 'test-fns';
import { HasMetadata } from 'type-fns';

import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask';
import {
SimpleAsyncTaskSqsQueueContract,
withAsyncTaskExecutionLifecycleEnqueue,
} from './withAsyncTaskExecutionLifecycleEnqueue';

/**
* an async task for emitting some data to remote persistance
*
* e.g., for @mhetrics/app-usage-events-react
*/
interface AsyncTaskEmitToRemote extends AsyncTask {
uuid?: string;
updatedAt?: string;
status: AsyncTaskStatus;
endpoint: string;
bytes: number;
payload: string;
}

const exampleDao = {
upsert: async ({ task }: { task: AsyncTaskEmitToRemote }) => {
console.log('mock.dao.upsert', { task });
return task as HasMetadata<AsyncTaskEmitToRemote>;
},
findByUnique: async () => null,
};

const exampleGetNew = (): AsyncTaskEmitToRemote => ({
status: AsyncTaskStatus.QUEUED,
payload: 'hello',
bytes: 7,
endpoint: 'yes',
});

describe('withAsyncTaskExecutionLifecycleQueue', () => {
given('a simple queue', () => {
const queue = createQueue({
order: QueueOrder.FIRST_IN_FIRST_OUT,
});

when('using it with async task lifecycle', () => {
const enqueue = withAsyncTaskExecutionLifecycleEnqueue({
getNew: exampleGetNew,
dao: exampleDao,
log: console,
queue,
});

then('it should successfully allow enqueue', async () => {
const enqueued = await enqueue({});
expect(enqueued).toHaveProperty('status');
expect(enqueued.status).toEqual(AsyncTaskStatus.QUEUED);
});
});
});

given('an sqs queue', () => {
const queue: SimpleAsyncTaskSqsQueueContract = {
type: 'SQS',
api: {
sendMessage: async (input) =>
console.log('mock: sqs.api.sendMessage', { input }),
},
url: '__url__',
};

when('using it with async task lifecycle', () => {
/**
* use the lifecycle
*/
const enqueue = withAsyncTaskExecutionLifecycleEnqueue({
getNew: exampleGetNew,
dao: exampleDao,
log: console,
queue,
});

then('it should successfully allow enqueue', async () => {
const enqueued = await enqueue({});
expect(enqueued).toHaveProperty('status');
expect(enqueued.status).toEqual(AsyncTaskStatus.QUEUED);
});
});
});
});
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { UnexpectedCodePathError } from '@ehmpathy/error-fns';
import type { LogMethods } from 'simple-leveled-log-methods';
import { HasMetadata, isAFunction } from 'type-fns';

Expand All @@ -7,6 +8,35 @@ import {
} from '../domain/constants';
import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask';

/**
* a simple, pit-of-success, contract for async-tasks queued via sqs
*
* benefits
* - guarantees a standard message body
* - guarantees a simple definition of the minimum inputs required
*/
export type SimpleAsyncTaskSqsQueueContract = {
type: 'SQS';
api: {
sendMessage: (input: {
queueUrl: string;
messageBody: string;
}) => Promise<void>;
};
url: string | (() => Promise<string>);
};

/**
* a simple, generic, contract for async-tasks queued via any queue
*
* benefits
* - enables interoperability with custom queue mechanisms not already supported
*/
export type SimpleAsyncTaskAnyQueueContract<T> = {
type?: 'ANY';
push: ((task: T) => void) | ((task: T) => Promise<void>);
};

/**
* enables creating an `queue` method that conforms to the pit-of-success execution-lifecycle of a task
*
Expand All @@ -25,10 +55,8 @@ import { AsyncTask, AsyncTaskStatus } from '../domain/objects/AsyncTask';
* 2. queues the task
* 1. writes to the sqs queue
* 2. updates the status of the task to `status.QUEUED`
*
* note: this wrapper uses BadRequestError specifically to make sure that the lambda invocation is not recorded as an error and that the request is not retried automatically
*/
export const withAsyncTaskExecutionLifecycleQueue = <
export const withAsyncTaskExecutionLifecycleEnqueue = <
T extends AsyncTask,
U extends Partial<T>,
D extends AsyncTaskDaoDatabaseConnection | undefined,
Expand All @@ -37,19 +65,12 @@ export const withAsyncTaskExecutionLifecycleQueue = <
getNew,
dao,
log,
sqs,
queue,
}: {
getNew: (args: P) => T | Promise<T>;
dao: AsyncTaskDao<T, U, D>;
log: LogMethods;
sqs: {
sendMessage: (input: {
queueUrl: string;
messageBody: string;
}) => Promise<void>;
};
queue: { url: string | (() => Promise<string>) };
queue: SimpleAsyncTaskSqsQueueContract | SimpleAsyncTaskAnyQueueContract<T>;
}) => {
return async (args: P): Promise<HasMetadata<T>> => {
// try to find the task by unique
Expand Down Expand Up @@ -107,10 +128,24 @@ export const withAsyncTaskExecutionLifecycleQueue = <
log.debug('adding task to queue', {
task: taskToQueue,
});
await sqs.sendMessage({
queueUrl: isAFunction(queue.url) ? await queue.url() : queue.url,
messageBody: JSON.stringify({ task: taskToQueue }),
});

await (async () => {
// support sqs queues natively
if (queue.type === 'SQS')
return await queue.api.sendMessage({
queueUrl: isAFunction(queue.url) ? await queue.url() : queue.url,
messageBody: JSON.stringify({ task: taskToQueue }),
});

// otherwise, assume it has a generic queue contract
if (queue.push) return await queue.push(taskToQueue);

// otherwise, this is not a supported queue mechanism
throw new UnexpectedCodePathError(
'unsupported queue mechanism specified',
{ queue },
);
})();

// and save that it has been queued
return await dao.upsert({
Expand Down

0 comments on commit 495141c

Please sign in to comment.