From 701daaec6003b8bfc93541106ab31e722d5446d7 Mon Sep 17 00:00:00 2001 From: Johannes Loher Date: Fri, 19 Jul 2024 23:34:56 +0200 Subject: [PATCH] feat: implement dynamic behavior for halfOpenAfter Closes #68 --- src/CircuitBreakerPolicy.test.ts | 35 +++++++++++++++++++++++ src/CircuitBreakerPolicy.ts | 48 ++++++++++++++++++++++++++++---- src/Policy.ts | 10 +++++-- 3 files changed, 85 insertions(+), 8 deletions(-) diff --git a/src/CircuitBreakerPolicy.test.ts b/src/CircuitBreakerPolicy.test.ts index ec908d8..3c4ffd3 100644 --- a/src/CircuitBreakerPolicy.test.ts +++ b/src/CircuitBreakerPolicy.test.ts @@ -7,6 +7,7 @@ import { abortedSignal } from './common/abort'; import { BrokenCircuitError, TaskCancelledError } from './errors/Errors'; import { IsolatedCircuitError } from './errors/IsolatedCircuitError'; import { circuitBreaker, handleAll, handleType } from './Policy'; +import { IterableBackoff } from './backoff/IterableBackoff'; class MyException extends Error {} @@ -79,6 +80,40 @@ describe('CircuitBreakerPolicy', () => { expect(onReset).calledOnce; }); + it('uses the given backof factory to decide whether to enter the half open state', async () => { + p = circuitBreaker(handleType(MyException), { + halfOpenAfter: new IterableBackoff([1000, 2000]), + breaker: new ConsecutiveBreaker(2), + }); + p.onReset(onReset); + p.onHalfOpen(onHalfOpen); + + await openBreaker(); + + clock.tick(1000); + + const failedAttempt = p.execute(stub().throws(new MyException())); + expect(p.state).to.equal(CircuitState.HalfOpen); + expect(onHalfOpen).calledOnce; + await expect(failedAttempt).to.be.rejectedWith(MyException); + expect(p.state).to.equal(CircuitState.Open); + + clock.tick(1000); + + await expect(p.execute(stub().throws(new MyException()))).to.be.rejectedWith( + BrokenCircuitError, + ); + + clock.tick(1000); + + const result = p.execute(stub().resolves(42)); + expect(p.state).to.equal(CircuitState.HalfOpen); + expect(onHalfOpen).calledTwice; + expect(await result).to.equal(42); + expect(p.state).to.equal(CircuitState.Closed); + expect(onReset).calledOnce; + }); + it('dedupes half-open tests', async () => { await openBreaker(); clock.tick(1000); diff --git a/src/CircuitBreakerPolicy.ts b/src/CircuitBreakerPolicy.ts index 83ff373..ef12cde 100644 --- a/src/CircuitBreakerPolicy.ts +++ b/src/CircuitBreakerPolicy.ts @@ -1,3 +1,4 @@ +import { ConstantBackoff, IBackoff, IBackoffFactory } from './backoff/Backoff'; import { IBreaker } from './breaker/Breaker'; import { neverAbortedSignal } from './common/abort'; import { EventEmitter } from './common/Event'; @@ -30,9 +31,31 @@ export enum CircuitState { Isolated, } +/** + * Context passed into halfOpenAfter backoff delegate. + */ +export interface IHalfOpenAfterBackoffContext extends IDefaultPolicyContext { + /** + * The consecutive number of times the circuit has entered the + * {@link CircuitState.Open} state. + */ + attempt: number; + /** + * The result of the last method call that caused the curcuit to enter the + * {@link CircuitState.Open}. Either a thrown error, or a value that we + * determined should open the circuit. + */ + result: FailureReason; +} + export interface ICircuitBreakerOptions { breaker: IBreaker; - halfOpenAfter: number; + /** + * When to (potentially) enter the {@link CircuitState.HalfOpen} state from + * the {@link CircuitState.Open} state. Either a duration in milliseconds or a + * backoff factory. + */ + halfOpenAfter: number | IBackoffFactory; } type InnerState = @@ -48,8 +71,11 @@ export class CircuitBreakerPolicy implements IPolicy { private readonly resetEmitter = new EventEmitter(); private readonly halfOpenEmitter = new EventEmitter(); private readonly stateChangeEmitter = new EventEmitter(); + private readonly halfOpenAfterBackoffFactory: IBackoffFactory; private innerLastFailure?: FailureReason; private innerState: InnerState = { value: CircuitState.Closed }; + private openEnteredCount = 0; + private halfOpenAfterBackof: IBackoff | undefined; /** * Event emitted when the circuit breaker opens. @@ -99,7 +125,12 @@ export class CircuitBreakerPolicy implements IPolicy { constructor( private readonly options: ICircuitBreakerOptions, private readonly executor: ExecuteWrapper, - ) {} + ) { + this.halfOpenAfterBackoffFactory = + typeof options.halfOpenAfter === 'number' + ? new ConstantBackoff(options.halfOpenAfter) + : options.halfOpenAfter; + } /** * Manually holds open the circuit breaker. @@ -152,7 +183,7 @@ export class CircuitBreakerPolicy implements IPolicy { } else { this.innerLastFailure = result; if (this.options.breaker.failure(state.value)) { - this.open(result); + this.open(result, signal); } } @@ -167,7 +198,7 @@ export class CircuitBreakerPolicy implements IPolicy { return this.execute(fn); case CircuitState.Open: - if (Date.now() - state.openedAt < this.options.halfOpenAfter) { + if (Date.now() - state.openedAt < (this.halfOpenAfterBackof?.duration ?? 0)) { throw new BrokenCircuitError(); } const test = this.halfOpen(fn, signal); @@ -197,7 +228,7 @@ export class CircuitBreakerPolicy implements IPolicy { } else { this.innerLastFailure = result; this.options.breaker.failure(CircuitState.HalfOpen); - this.open(result); + this.open(result, signal); } return returnOrThrow(result); @@ -209,7 +240,7 @@ export class CircuitBreakerPolicy implements IPolicy { } } - private open(reason: FailureReason) { + private open(reason: FailureReason, signal: AbortSignal) { if (this.state === CircuitState.Isolated || this.state === CircuitState.Open) { return; } @@ -217,6 +248,9 @@ export class CircuitBreakerPolicy implements IPolicy { this.innerState = { value: CircuitState.Open, openedAt: Date.now() }; this.breakEmitter.emit(reason); this.stateChangeEmitter.emit(CircuitState.Open); + const context = { attempt: ++this.openEnteredCount, result: reason, signal }; + this.halfOpenAfterBackof = + this.halfOpenAfterBackof?.next(context) ?? this.halfOpenAfterBackoffFactory.next(context); } private close() { @@ -224,6 +258,8 @@ export class CircuitBreakerPolicy implements IPolicy { this.innerState = { value: CircuitState.Closed }; this.resetEmitter.emit(); this.stateChangeEmitter.emit(CircuitState.Closed); + this.halfOpenAfterBackof = undefined; + this.openEnteredCount = 0; } } } diff --git a/src/Policy.ts b/src/Policy.ts index 6404d59..a8a7f6b 100644 --- a/src/Policy.ts +++ b/src/Policy.ts @@ -1,7 +1,7 @@ import { ConstantBackoff, IBackoffFactory } from './backoff/Backoff'; import { IBreaker } from './breaker/Breaker'; import { BulkheadPolicy } from './BulkheadPolicy'; -import { CircuitBreakerPolicy } from './CircuitBreakerPolicy'; +import { CircuitBreakerPolicy, IHalfOpenAfterBackoffContext } from './CircuitBreakerPolicy'; import { Event } from './common/Event'; import { ExecuteWrapper } from './common/Executor'; import { FallbackPolicy } from './FallbackPolicy'; @@ -450,7 +450,13 @@ export function retry( * @param breaker The circuit breaker to use. This package exports * ConsecutiveBreaker and SamplingBreakers for you to use. */ -export function circuitBreaker(policy: Policy, opts: { halfOpenAfter: number; breaker: IBreaker }) { +export function circuitBreaker( + policy: Policy, + opts: { + halfOpenAfter: number | IBackoffFactory; + breaker: IBreaker; + }, +) { return new CircuitBreakerPolicy( opts, new ExecuteWrapper(policy.options.errorFilter, policy.options.resultFilter),