diff --git a/docs/astro.config.mjs b/docs/astro.config.mjs index 43db0ad..5cc7939 100644 --- a/docs/astro.config.mjs +++ b/docs/astro.config.mjs @@ -8,6 +8,19 @@ const github = 'https://github.com/missive-js/missive.js'; const githubURL = new URL(github); const githubPathParts = githubURL.pathname.split('/'); const title = 'Missive.js'; + +const stamps = [ + 'IdentityStamp', + 'HandledStamp', + 'ReprocessedStamp', + 'AsyncStamp', + 'FromCacheStamp', + 'FeatureFlagFallbackStamp', + 'TimingsStamp', + 'RetriedStamp', + 'WebhookCalledStamp', +]; + export default defineConfig({ site: `https://${githubPathParts[1]}.github.io/${githubPathParts[2]}`, base: `${githubPathParts[2]}`, @@ -59,6 +72,14 @@ export default defineConfig({ label: 'Built-in Middlewares', autogenerate: { directory: 'built-in-middlewares' }, }, + { + label: 'Built-in Stamps', + collapsed: true, + items: stamps.map((stamp) => ({ + label: stamp, + link: `built-in-stamps#${stamp.toLowerCase()}`, + })), + }, { label: 'Contributing', slug: 'contributing', diff --git a/docs/src/content/docs/built-in-middlewares/async.mdx b/docs/src/content/docs/built-in-middlewares/async.mdx new file mode 100644 index 0000000..f25abbe --- /dev/null +++ b/docs/src/content/docs/built-in-middlewares/async.mdx @@ -0,0 +1,107 @@ +--- +title: Async Middleware +description: Built-in middleware to defer handling to a consumer. +--- + +import { Icon, Aside, Steps } from '@astrojs/starlight/components'; + +The Async Middleware is built-in middleware that gives you capability to defer the handling to an consumer to achieve real asynchronousity. +It's only available for _CommandBus_ and _EventBus_ + +## How to use it + +As for any Middleware, you can use it by adding it to the `bus` instance. + +```typescript +const commandBus = createCommandBus(); +commandBus.useAsyncMiddleware({ + consume: false, // KEY POINT + produce: async (envelope) => { + // use your favorite queue system here + console.log('Generic Push to Queue', envelope); + }, + async: true,// default is true + intents: { + createUser: { + async: true, + produce: async (envelope) => { + // use your favorite queue system here + console.log('createUser Push to Queue', envelope); + }, + }, + }, +}); +``` + +> Remember built-in middlewares are _intent_ aware, therefore you can customize the behavior per intent using the key `intents`. + +Next, you need to have a consumer that will consume it. The way to do that with Missive.js is to create another bus with this middlware with `consume: true`. + +```typescript +commandBus.useAsyncMiddleware({ + consume: true, // KEY POINT +}); +``` + +The worker script that consumes the queue can dispatch the message it receives directly to the dispatch method: + +```typescript +// Consumer script +onMessage: async (message) => { + const envelope = JSON.parse(message); + await commandBus.dispatch(intent); +} +``` + + +### Explanation + +The flow is the following: + + + +1. Your application (web node for instance) will have a bus on which this middleware is added with `consume: false`. + +2. When you dispatch an intent, the middleware will push the intent to the queue system (via the `produce` method that you provide) instead of handling it. + +3. You have another application (worker node for instance) that will have a bus on which this middleware is added with `consume: true`. + +4. This worker will consume the intent from the queue system and handle it. + + + + + + + + +## Added Stamps + +The Async Middleware is going to add: + + - + ```typescript + type AsyncStamp = Stamp; + ``` + > When the intent is pushed to the queue. + + - + ```typescript + type ReprocessedStamp = Stamp<{ stamps: Stamp[] }, 'missive:reprocessed'>; + ``` + > When the envelope is dispatched. + +## Going further + +
+ Look at the code of the + Async Middleware +
diff --git a/docs/src/content/docs/built-in-stamps.mdx b/docs/src/content/docs/built-in-stamps.mdx new file mode 100644 index 0000000..e80ceee --- /dev/null +++ b/docs/src/content/docs/built-in-stamps.mdx @@ -0,0 +1,106 @@ +--- +title: Built-in Stamps +description: All the built-in Stamps that Missive.js provides. +--- + +import { Aside } from '@astrojs/starlight/components'; + + +This page lists all the built-in Stamps that Missive.js provides. +Stamps are a way to handle cross-cutting concerns in your application. They are key to keeping your code clean and maintainable. +Most of all, they are easy to write and use, and they can be generic! + +## Added by the Bus + +### IdentityStamp + +```typescript +type AsyncStamp = Stamp; +``` +Added on `bus.dispatch(intent|envelope)`. + + + +### HandledStamp + +```typescript +type HandledStamp = Stamp; +``` +Added when the intent is handled by the handler. + + + + + + +### ReprocessedStamp + +```typescript +type ReprocessedStamp = Stamp<{ stamps: Stamp[] }, 'missive:reprocessed'>; +``` + +Added when the `envelope` is dispatched through the bus instead of an `intent`. When this happens, +the bus will save the original `stamps` in the `ReprocessedStamp` stamp. + + +## Added by the Middlewares + +### AsyncStamp + +```typescript +type AsyncStamp = Stamp; +``` + +Added when the envelope is sent to a queue via the [Async middleware](/missive.js/built-in-middlewares/async). + +### FromCacheStamp + +```typescript +type FromCacheStamp = Stamp; +``` + +Added when the [Cacher middleware](/missive.js/built-in-middlewares/cacher) finds the result in the cache. + +### FeatureFlagFallbackStamp + +```typescript +type FeatureFlagFallbackStamp = Stamp; +``` + +When the [Feature Flag Middleware](/missive.js/built-in-middlewares/feature-flag) uses a fallbackHandler. + +### TimingsStamp + + ```typescript +type TimingsStamp = Stamp<{ total: number }, 'missive:timings'> +``` + +Add by the [Logger middleware](/missive.js/built-in-middlewares/logger) when the message is handled or errored with the total time elapsed in nanoseconds. + +### RetriedStamp + +```typescript +type RetriedStamp = Stamp<{ attempt: number; errorMessage: string }, 'missive:retried'>; +``` + +Added by the [Retryer middleware](/missive.js/built-in-middlewares/retryer) when the middleware retries the handling of the intent. + + + +### WebhookCalledStamp + +```typescript +type WebhookCalledStamp = Stamp<{ attempt: number; text?: string, status?: number }, 'missive:webhook-called'>; +``` + +Add by the [Webhook middleware](/missive.js/built-in-middlewares/webhook) when the middleware succeed to call the webhook(s) or ultimately at the end of the retries. + + diff --git a/docs/src/content/docs/guides/middlewares.mdx b/docs/src/content/docs/guides/middlewares.mdx index 37f2457..c9f8213 100644 --- a/docs/src/content/docs/guides/middlewares.mdx +++ b/docs/src/content/docs/guides/middlewares.mdx @@ -127,4 +127,5 @@ For this reason the bus will never handle an intent that has been handled alread Within the built-in middlewares, here is the list of the ones where you have the option to break the chain. _(default: yes)_: - [**CacherMiddleware**](/missive.js/built-in-middlewares/cacher): if the intent has been cached. -- [**FeatureFlagiddleware**](/missive.js/built-in-middlewares/feature-flag): if the intent has been handled by a _fallbackHandler_. +- [**FeatureFlagMiddleware**](/missive.js/built-in-middlewares/feature-flag): if the intent has been handled by a _fallbackHandler_. +- [**AsyncMiddleware**](/missive.js/built-in-middlewares/async): when the envelope is sent to a queue. diff --git a/docs/src/content/docs/index.mdx b/docs/src/content/docs/index.mdx index cf7524b..9f4c9dd 100644 --- a/docs/src/content/docs/index.mdx +++ b/docs/src/content/docs/index.mdx @@ -50,6 +50,7 @@ We love clean architecture and CQRS, we didn't find any simple Service Bus that [Lock](/missive.js/built-in-middlewares/lock), [FeatureFlag](/missive.js/built-in-middlewares/feature-flag), [Mock](/missive.js/built-in-middlewares/mocker), + [Async](/missive.js/built-in-middlewares/async), or [Webhook](/missive.js/built-in-middlewares/webhook). @@ -147,9 +148,8 @@ That's the power of Missive.js! Here are the built-in middlewares: -

You can already do async if you don't _await_ in your handler, but this is next level. Push the intent to a queue and handle it asynchronously.

-

Missive.js will provide the consumer which is going to be smart enough to handle the async processing.

- +

You can already do async if you don't _await_ in your handler, but this is next level to defer handling to a consumer. Push the intent to a queue and handle it asynchronously.

+ Read the doc
diff --git a/docs/src/content/docs/why.mdx b/docs/src/content/docs/why.mdx index 1f57a94..612f430 100644 --- a/docs/src/content/docs/why.mdx +++ b/docs/src/content/docs/why.mdx @@ -55,6 +55,7 @@ And on top of what you can do on your own, Missive.js provides a set of built-in - [**Logger**](/missive.js/built-in-middlewares/logger): Log the messages and the full Envelope (with the Stamps) before and once handled (or errored) in the backend of your choice. - [**FeatureFlag**](/missive.js/built-in-middlewares/feature-flag): Control the activation of specific features and manage dynamic feature management, safer rollouts, and efficient A/B testing. - [**Mocker**](/missive.js/built-in-middlewares/mocker): Mock the result of a specific intent to bypass the handler. +- [**Async**](/missive.js/built-in-middlewares/async): Defer handling to a consumer via a queue. This middleware architecture is familiar if you've used libraries like Express, Fastify, Koa, etc. making it intuitive for both backend and frontend developers. diff --git a/examples/shared/src/core/buses.server.ts b/examples/shared/src/core/buses.server.ts index 9126cd5..c1683e5 100644 --- a/examples/shared/src/core/buses.server.ts +++ b/examples/shared/src/core/buses.server.ts @@ -83,20 +83,20 @@ const commandBus: CommandBus = createCommandBus({ { messageName: 'removeUser', schema: removeUserCommandSchema, handler: createRemoveUserHandler({}) }, ], }); -commandBus.useMockerMiddleware({ - intents: { - createUser: async (envelope) => ({ - success: true, - userId: '1234', - }), - removeUser: async (envelope) => { - return { - success: true, - removeCount: 42, - }; - }, - }, -}); +// commandBus.useMockerMiddleware({ +// intents: { +// createUser: async (envelope) => ({ +// success: true, +// userId: '1234', +// }), +// removeUser: async (envelope) => { +// return { +// success: true, +// removeCount: 42, +// }; +// }, +// }, +// }); commandBus.useLockMiddleware({ adapter: { acquire: async () => true, @@ -114,6 +114,22 @@ commandBus.useLockMiddleware({ }, }); +commandBus.useAsyncMiddleware({ + consume: false, + produce: async (envelope) => { + console.log('Generic Push to Queue', envelope); + }, + async: false, + intents: { + createUser: { + async: true, + produce: async (envelope) => { + console.log('createUser Push to Queue', envelope); + }, + }, + }, +}); + commandBus.useWebhookMiddleware({ async: true, parallel: true, diff --git a/examples/shared/src/domain/use-cases/get-orders.ts b/examples/shared/src/domain/use-cases/get-orders.ts index a16dd10..6a8ed9c 100644 --- a/examples/shared/src/domain/use-cases/get-orders.ts +++ b/examples/shared/src/domain/use-cases/get-orders.ts @@ -1,4 +1,3 @@ -import { stat } from 'fs'; import { Envelope, QueryHandlerDefinition } from 'missive.js'; import { CacheableStamp } from 'missive.js'; import { z } from 'zod'; diff --git a/libs/missive.js/package.json b/libs/missive.js/package.json index 54dd2ce..1edaa8d 100644 --- a/libs/missive.js/package.json +++ b/libs/missive.js/package.json @@ -18,7 +18,7 @@ "Sébastien Morel ", "Anaël Chardan" ], - "version": "0.2.0", + "version": "0.3.0", "type": "module", "main": "./build/index.cjs", "module": "./build/index.js", diff --git a/libs/missive.js/src/core/bus.ts b/libs/missive.js/src/core/bus.ts index c0e964d..766bfbb 100644 --- a/libs/missive.js/src/core/bus.ts +++ b/libs/missive.js/src/core/bus.ts @@ -1,5 +1,5 @@ import type { Schema as ZodSchema } from 'zod'; -import { createEnvelope, HandledStamp, IdentityStamp, type Envelope } from './envelope.js'; +import { createEnvelope, HandledStamp, IdentityStamp, ReprocessedStamp, type Envelope } from './envelope.js'; import type { Prettify, ReplaceKeys } from '../utils/types.js'; import type { Middleware } from './middleware.js'; import { nanoid } from 'nanoid'; @@ -10,6 +10,7 @@ import { createWebhookMiddleware } from '../middlewares/webhook-middleware.js'; import { createLockMiddleware } from '../middlewares/lock-middleware.js'; import { createFeatureFlagMiddleware } from '../middlewares/feature-flag-middleware.js'; import { createMockerMiddleware } from '../middlewares/mocker-middleware.js'; +import { createAsyncMiddleware } from '../middlewares/async-middleware.js'; export type BusKinds = 'query' | 'command' | 'event'; export type MessageRegistryType = Record>; @@ -63,12 +64,20 @@ type MissiveBus, handler: MessageHandler, ) => void; - dispatch: ( - intent: TypedMessage, - ) => Promise<{ - envelope: Envelope; - result: HandlerDefinitions[MessageName]['result'] | undefined; - }>; + dispatch: { + ( + intent: TypedMessage, + ): Promise<{ + envelope: Envelope; + result: HandlerDefinitions[MessageName]['result'] | undefined; + }>; + ( + envelope: Envelope>, + ): Promise<{ + envelope: Envelope; + result: HandlerDefinitions[MessageName]['result'] | undefined; + }>; + }; createIntent: ( type: MessageName, intent: HandlerDefinitions[MessageName][BusKind], @@ -87,6 +96,7 @@ type MissiveCommandBus = ...props: Parameters> ) => void; useMockerMiddleware: (...props: Parameters>) => void; + useAsyncMiddleware: (...props: Parameters>) => void; }; export type CommandBus = Prettify< @@ -123,6 +133,7 @@ type MissiveEventBus = Repl ...props: Parameters> ) => void; useMockerMiddleware: (...props: Parameters>) => void; + useAsyncMiddleware: (...props: Parameters>) => void; }; export type EventBus = Prettify< MissiveEventBus @@ -182,11 +193,8 @@ const createBus = ( handlers: MessageHandler[], ) => { - return async (message: HandlerDefinitions[MessageName][BusKind]) => { - const envelope: Envelope = createEnvelope(message); - envelope.addStamp('missive:identity', { id: nanoid() }); + return async (envelope: Envelope) => { let index = 0; - const next = async () => { if (index < middlewares.length) { const middleware = middlewares[index++]; @@ -213,26 +221,53 @@ const createBus = ( + payload: + | ExtractedMessage + | Envelope>, + ): payload is Envelope> { + return payload && 'stamps' in payload && 'message' in payload; + } + const dispatch = async ( - message: ExtractedMessage, + payload: + | ExtractedMessage + | Envelope>, ): Promise<{ envelope: Envelope; result: HandlerDefinitions[MessageName]['result'] | undefined; results: (HandlerDefinitions[MessageName]['result'] | undefined)[]; }> => { - const entry = registry[message.__type]; + const isEnveloped = isEnvelope(payload); + const type = isEnveloped ? payload.message.__type : payload.__type; + const entry = registry[type]; if (!entry) { - throw new Error(`No handler found for type: ${message.__type}`); + throw new Error(`No handler found for type: ${type}`); } const { handlers } = entry; const chain = createMiddlewareChain(handlers); - const envelope = await chain(message); + // if we dispatch an envelope we do not need to create a new one and backup the original stamps + // while keeping the identity stamp + const envelope = (() => { + if (!isEnveloped) { + const envelope = createEnvelope(payload); + envelope.addStamp('missive:identity', { id: nanoid() }); + return envelope; + } + const identity = payload.firstStamp('missive:identity'); + const stamps = payload.stamps.filter((stamp) => stamp.type !== 'missive:identity'); + const envelope = createEnvelope(payload.message); + envelope.addStamp('missive:identity', { id: identity?.body?.id || nanoid() }); + envelope.addStamp('missive:reprocessed', { + stamps, + }); + return envelope; + })(); + await chain(envelope); return { envelope, result: envelope.lastStamp>('missive:handled') @@ -295,6 +330,9 @@ export function createCommandBus>) => { commandBus.use(createMockerMiddleware(...props)); }, + useAsyncMiddleware: (...props: Parameters>) => { + commandBus.use(createAsyncMiddleware(...props)); + }, register: commandBus.register, dispatch: commandBus.dispatch, createCommand: commandBus.createIntent, @@ -365,6 +403,9 @@ export function createEventBus>) => { eventBus.use(createMockerMiddleware(...props)); }, + useAsyncMiddleware: (...props: Parameters>) => { + eventBus.use(createAsyncMiddleware(...props)); + }, register: eventBus.register, dispatch: eventBus.dispatch, createEvent: eventBus.createIntent, diff --git a/libs/missive.js/src/core/envelope.ts b/libs/missive.js/src/core/envelope.ts index b6c579e..227529b 100644 --- a/libs/missive.js/src/core/envelope.ts +++ b/libs/missive.js/src/core/envelope.ts @@ -14,6 +14,7 @@ export type Stamp = { }; export type IdentityStamp = Stamp<{ id: string }, 'missive:identity'>; +export type ReprocessedStamp = Stamp<{ stamps: Stamp[] }, 'missive:reprocessed'>; export type HandledStamp = Stamp; export function createEnvelope(message: T): Envelope { diff --git a/libs/missive.js/src/middlewares/async-middleware.ts b/libs/missive.js/src/middlewares/async-middleware.ts new file mode 100644 index 0000000..9f8e182 --- /dev/null +++ b/libs/missive.js/src/middlewares/async-middleware.ts @@ -0,0 +1,48 @@ +import { BusKinds, MessageRegistry, MessageRegistryType, TypedMessage } from '../core/bus.js'; +import { Envelope, Stamp } from '../core/envelope.js'; +import { Middleware } from '../core/middleware.js'; + +type Options> = + | { + consume: false; + async?: boolean; + produce: (envelope: Envelope>>) => Promise; + intents?: { + [K in keyof T]?: { + async?: boolean; + produce?: (envelope: NarrowedEnvelope) => Promise; + }; + }; + } + | { + consume: true; + async?: never; + produce?: never; + intents?: never; + }; + +type NarrowedEnvelope, K extends keyof T> = Envelope< + TypedMessage>> +>; + +export type AsyncStamp = Stamp; + +export function createAsyncMiddleware>({ + consume, + intents, + produce, + async = true, +}: Options): Middleware { + return async (envelope, next) => { + const type = envelope.message.__type as keyof T; + const isAsync = intents?.[type]?.async ?? async; + // we need to push to the queue here and add a stamp for reference + if (isAsync && consume === false) { + envelope.addStamp('missive:async'); + const producer = intents?.[type]?.produce ?? produce; + await producer(envelope); + return; + } + await next(); + }; +} diff --git a/libs/missive.js/src/middlewares/cacher-middleware.ts b/libs/missive.js/src/middlewares/cacher-middleware.ts index 42950ec..0956b18 100644 --- a/libs/missive.js/src/middlewares/cacher-middleware.ts +++ b/libs/missive.js/src/middlewares/cacher-middleware.ts @@ -42,7 +42,7 @@ export function createCacherMiddleware({ adapter = createMemoryCacheAdapter(); } return async (envelope, next) => { - const type = envelope.message.__type; + const type = envelope.message.__type as keyof T; const key = await hashKey(JSON.stringify(envelope.message)); const cached = await adapter.get(key); if (cached) { diff --git a/libs/missive.js/tests/async-middleware.test.ts b/libs/missive.js/tests/async-middleware.test.ts new file mode 100644 index 0000000..55cc1f0 --- /dev/null +++ b/libs/missive.js/tests/async-middleware.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { Envelope } from '../src/core/envelope'; +import { TypedMessage } from '../src/core/bus'; +import { createAsyncMiddleware } from '../src/middlewares/async-middleware'; + +type MessageRegistry = { + 'test-message': { + command: { id: number }; + result: { + data: string; + }; + }; +}; + +describe('createAsyncMiddleware', () => { + let next: ReturnType; + let envelope: Envelope>; + + beforeEach(() => { + next = vi.fn(); + envelope = { + message: { __type: 'test-message', id: 1 }, + stamps: [], + stampsOfType: vi.fn(), + addStamp: vi.fn(), + firstStamp: vi.fn(), + lastStamp: vi.fn(), + }; + }); + + it('should call produce function and add async stamp when consume is false', async () => { + const produce = vi.fn().mockResolvedValue(undefined); + const middleware = createAsyncMiddleware<'command', MessageRegistry>({ + consume: false, + produce, + }); + + await middleware(envelope, next); + + expect(produce).toHaveBeenCalledWith(envelope); + expect(envelope.addStamp).toHaveBeenCalledWith('missive:async'); + expect(next).not.toHaveBeenCalled(); + }); + + it('should call next when consume is true', async () => { + const middleware = createAsyncMiddleware<'command', MessageRegistry>({ + consume: true, + }); + + await middleware(envelope, next); + + expect(next).toHaveBeenCalled(); + }); + + it('should call intent produce function if defined', async () => { + const intentProduce = vi.fn().mockResolvedValue(undefined); + const intentProdueSpecific = vi.fn().mockResolvedValue(undefined); + const middleware = createAsyncMiddleware<'command', MessageRegistry>({ + consume: false, + produce: intentProduce, + intents: { + 'test-message': { + produce: intentProdueSpecific, + }, + }, + }); + + await middleware(envelope, next); + + expect(intentProduce).not.toHaveBeenCalledWith(envelope); + expect(intentProdueSpecific).toHaveBeenCalledWith(envelope); + expect(envelope.addStamp).toHaveBeenCalledWith('missive:async'); + expect(next).not.toHaveBeenCalled(); + }); + + it('should call next if async is false', async () => { + const middleware = createAsyncMiddleware<'command', MessageRegistry>({ + consume: false, + async: false, + produce: vi.fn(), + }); + + await middleware(envelope, next); + + expect(next).toHaveBeenCalled(); + expect(envelope.addStamp).not.toHaveBeenCalled(); + }); + + it('should call next if intent async is false', async () => { + const intentProduce = vi.fn().mockResolvedValue(undefined); + const intentProdueSpecific = vi.fn().mockResolvedValue(undefined); + const middleware = createAsyncMiddleware<'command', MessageRegistry>({ + consume: false, + produce: intentProduce, + intents: { + 'test-message': { + async: false, + produce: intentProdueSpecific, + }, + }, + }); + + await middleware(envelope, next); + + expect(next).toHaveBeenCalled(); + expect(intentProduce).not.toHaveBeenCalledWith(envelope); + expect(intentProdueSpecific).not.toHaveBeenCalledWith(envelope); + expect(envelope.addStamp).not.toHaveBeenCalled(); + }); +});