-
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: introducing retryer middleware
- Loading branch information
Showing
10 changed files
with
445 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
--- | ||
title: Retryer Middleware | ||
description: Built-in middleware to retry handling if in error. | ||
--- | ||
|
||
import { Icon, Aside } from '@astrojs/starlight/components'; | ||
|
||
The Retryer Middleware is built-in middleware that gives you capability to retry the handling of an intent. | ||
|
||
## How to use it | ||
|
||
As for any Middleware, you can use it by adding it to the `bus` instance. | ||
|
||
```typescript | ||
const retryerMiddleware = createRetryerMiddleware({ | ||
maxAttempts: 5; | ||
waitingAlgorithm: 'exponential', | ||
multiplier: 1.5; | ||
jitter: 0.5; | ||
}); | ||
const queryBus = createQueryBus<QueryHandlerRegistry>(); | ||
queryBus.use(retryerMiddleware); | ||
``` | ||
|
||
### Explanation | ||
|
||
The Retryer middleware is going to catch the exection and re-run the following middleware until the `maxAttempts` is reached. | ||
Between each attempt, the middleware is going to wait for a certain amount of time. | ||
The `waitingAlgorithm` can be `exponential`, `fibonacci`, or `none`. | ||
|
||
- `jitter` is a value between 0 and 1 that will add some randomness to the waiting time. | ||
- `multiplier` is the factor to multiply the waiting time between each attempt. (only used for `exponential`) | ||
|
||
<Aside title="Error Stamps" type="caution"> | ||
Retryer Middleware will also retry the handling of the intent if it finds more `error` stamps than before. | ||
Some handlers might not throw exceptions but add `error` stamps to the result. A good example would be a lock middleware that would | ||
add an `error` stamp if the lock is not acquired. | ||
</Aside> | ||
|
||
## Added Stamps | ||
|
||
The Retryer Middleware is going to add: | ||
|
||
- | ||
```typescript | ||
type RetriedStamp = Stamp<{ attempt: number; errorMessage: string }, 'missive:retried'>; | ||
``` | ||
> Every time the middleware retries the handling of the intent, it will add a `RetriedStamp` to the envelope. | ||
|
||
|
||
## Going further | ||
|
||
<div class='flex flex-row'> | ||
<span className='pr-2'>Look at the code of the </span> | ||
<a href="https://github.com/Missive-js/missive.js/tree/main/libs/missive.js/src/middlewares/retryer-middleware.ts" class="contents" target="_blank"><Icon name="github" class="mr-2"/>Retryer Middleware</a> | ||
</div> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
"Sébastien Morel <[email protected]>", | ||
"Anaël Chardan" | ||
], | ||
"version": "0.0.4", | ||
"version": "0.0.5", | ||
"type": "module", | ||
"main": "./build/index.cjs", | ||
"module": "./build/index.js", | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import { Stamp } from '../core/envelope.js'; | ||
import { GenericMiddleware } from '../core/middleware.js'; | ||
import { createExponentialSleeper, createFibonnaciSleeper } from '../utils/sleeper.js'; | ||
|
||
type Options = { | ||
maxAttempts: number; | ||
waitingAlgorithm: 'exponential' | 'fibonacci' | 'none'; | ||
multiplier: number; | ||
jitter: number; | ||
}; | ||
|
||
export type RetriedStamp = Stamp<{ attempt: number; errorMessage: string }, 'missive:retried'>; | ||
|
||
export function createRetryerMiddleware({ | ||
maxAttempts = 3, | ||
waitingAlgorithm = 'exponential', | ||
multiplier = 1.5, | ||
jitter = 0.5, | ||
}: Partial<Options> = {}): GenericMiddleware { | ||
const noneSleeper = () => ({ wait: async () => {}, reset: () => {} }); | ||
const sleeper = | ||
waitingAlgorithm === 'none' | ||
? noneSleeper() | ||
: waitingAlgorithm === 'exponential' | ||
? createExponentialSleeper(multiplier, jitter) | ||
: createFibonnaciSleeper(jitter); | ||
|
||
return async (envelope, next) => { | ||
let attempt = 1; | ||
sleeper.reset(); | ||
let lastError: unknown | null = null; | ||
while (attempt <= maxAttempts) { | ||
try { | ||
const initialErrorStampCount = envelope.stamps.filter((stamp) => stamp.type === 'error').length; | ||
await next(); | ||
const errorStampCount = envelope.stamps.filter((stamp) => stamp.type === 'error').length; | ||
const newErrorStampCount = errorStampCount - initialErrorStampCount; | ||
if (newErrorStampCount === 0) { | ||
// no new error, we are goog to go | ||
return; | ||
} | ||
envelope.addStamp<RetriedStamp>('missive:retried', { | ||
attempt, | ||
errorMessage: `New error stamp count: ${newErrorStampCount}`, | ||
}); | ||
} catch (error) { | ||
lastError = error; | ||
envelope.addStamp<RetriedStamp>('missive:retried', { | ||
attempt, | ||
errorMessage: error instanceof Error ? error.message : String(error), | ||
}); | ||
} | ||
attempt++; | ||
if (attempt > maxAttempts) { | ||
// if we have an error, we throw it | ||
if (lastError !== null) { | ||
throw lastError; | ||
} | ||
// if we don't have an error because the retries we based on an error stamp count, we just return | ||
return; | ||
} | ||
await sleeper.wait(); | ||
} | ||
}; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
const sleep = (s: number) => new Promise((r) => setTimeout(r, s * 1000)); | ||
|
||
type Deps = { | ||
sleepFn?: (s: number) => Promise<unknown>; | ||
}; | ||
export const createFibonnaciSleeper = (jitter = 0, deps?: Deps) => { | ||
const sleepFn = deps?.sleepFn || sleep; | ||
let a = 0, | ||
b = 1; | ||
return { | ||
wait: async () => { | ||
const w = a + b; | ||
a = b; | ||
b = w; | ||
const max = w * (1 + jitter); | ||
const min = w * (1 - jitter); | ||
const jitteredDelay = Math.random() * (max - min) + min; | ||
await sleepFn(jitteredDelay); | ||
}, | ||
reset: () => { | ||
a = 0; | ||
b = 1; | ||
}, | ||
}; | ||
}; | ||
|
||
export const createExponentialSleeper = (multiplier: number = 1.5, jitter: number = 0.5, deps?: Deps) => { | ||
const sleepFn = deps?.sleepFn || sleep; | ||
let currentDelay = 0.5; | ||
return { | ||
wait: async () => { | ||
const max = currentDelay * (1 + jitter); | ||
const min = currentDelay * (1 - jitter); | ||
const jitteredDelay = Math.random() * (max - min) + min; | ||
await sleepFn(jitteredDelay); | ||
currentDelay = currentDelay * multiplier; | ||
}, | ||
reset: () => { | ||
currentDelay = 0.5; | ||
}, | ||
}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import { describe, it, expect, vi, beforeEach } from 'vitest'; | ||
import { createRetryerMiddleware, RetriedStamp } from '../src/middlewares/retryer-middleware'; | ||
import { createEnvelope, Envelope } from '../src/core/envelope'; | ||
|
||
describe('createRetryerMiddleware', () => { | ||
let nextMock: ReturnType<typeof vi.fn>; | ||
let envelope: Envelope<unknown>; | ||
|
||
beforeEach(() => { | ||
nextMock = vi.fn(); | ||
envelope = createEnvelope('test message'); | ||
}); | ||
|
||
it('should retry the correct number of times with none algorithm', async () => { | ||
const middleware = createRetryerMiddleware({ | ||
maxAttempts: 5, | ||
waitingAlgorithm: 'none', | ||
multiplier: 0, | ||
jitter: 0, | ||
}); | ||
nextMock.mockRejectedValueOnce(new Error('Test Error')); | ||
nextMock.mockRejectedValueOnce(new Error('Test Error')); | ||
nextMock.mockResolvedValueOnce(undefined); | ||
await middleware(envelope, nextMock); | ||
expect(nextMock).toHaveBeenCalledTimes(3); | ||
const retriedStamps = envelope.stampsOfType<RetriedStamp>('missive:retried'); | ||
expect(retriedStamps?.length || 0).toBe(2); | ||
}); | ||
|
||
it('should add retried stamp on error', async () => { | ||
const middleware = createRetryerMiddleware({ | ||
maxAttempts: 3, | ||
waitingAlgorithm: 'none', | ||
multiplier: 0, | ||
jitter: 0, | ||
}); | ||
nextMock.mockRejectedValueOnce(new Error('Test Error')); | ||
|
||
try { | ||
await middleware(envelope, nextMock); | ||
} catch { | ||
// expected to throw | ||
} | ||
const retriedStamps = envelope.stampsOfType<RetriedStamp>('missive:retried'); | ||
expect(retriedStamps?.length || 0).toBe(1); | ||
expect(retriedStamps[0]!.body!.attempt).toBe(1); | ||
expect(retriedStamps[0]!.body!.errorMessage).toBe('Test Error'); | ||
}); | ||
|
||
it('should stop retrying after max attempts', async () => { | ||
const middleware = createRetryerMiddleware({ | ||
maxAttempts: 2, | ||
waitingAlgorithm: 'none', | ||
multiplier: 0, | ||
jitter: 0, | ||
}); | ||
nextMock.mockRejectedValue(new Error('Test Error')); | ||
|
||
try { | ||
await middleware(envelope, nextMock); | ||
} catch { | ||
// expected to throw | ||
} | ||
|
||
expect(nextMock).toHaveBeenCalledTimes(2); | ||
}); | ||
|
||
it('should not retry if no error occurs', async () => { | ||
const middleware = createRetryerMiddleware({ | ||
maxAttempts: 3, | ||
waitingAlgorithm: 'none', | ||
multiplier: 0, | ||
jitter: 0, | ||
}); | ||
nextMock.mockResolvedValueOnce(undefined); | ||
|
||
await middleware(envelope, nextMock); | ||
|
||
expect(nextMock).toHaveBeenCalledTimes(1); | ||
expect(envelope.stamps).not.toContainEqual(expect.objectContaining({ type: 'missive:retried' })); | ||
}); | ||
|
||
it('should respect maxAttempts and throw last error', async () => { | ||
const middleware = createRetryerMiddleware({ | ||
maxAttempts: 2, | ||
waitingAlgorithm: 'none', | ||
multiplier: 0, | ||
jitter: 0, | ||
}); | ||
nextMock.mockRejectedValue(new Error('test error')); | ||
await expect(middleware(envelope, nextMock)).rejects.toThrow('test error'); | ||
expect(nextMock).toHaveBeenCalledTimes(2); | ||
}); | ||
|
||
it('should handle error stamps from next middleware', async () => { | ||
const middleware = createRetryerMiddleware({ | ||
maxAttempts: 3, | ||
waitingAlgorithm: 'none', | ||
multiplier: 0, | ||
jitter: 0, | ||
}); | ||
|
||
nextMock.mockImplementationOnce(() => { | ||
envelope.addStamp('error', { message: 'test error' }); | ||
return Promise.resolve(); | ||
}); | ||
nextMock.mockImplementationOnce(() => { | ||
envelope.addStamp('error', { message: 'test error' }); | ||
return Promise.resolve(); | ||
}); | ||
nextMock.mockResolvedValueOnce(undefined); | ||
await middleware(envelope, nextMock); | ||
expect(nextMock).toHaveBeenCalledTimes(3); | ||
}); | ||
}); |
Oops, something went wrong.