Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement dynamic behavior for halfOpenAfter #96

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/CircuitBreakerPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { abortedSignal } from './common/abort';
import { BrokenCircuitError, TaskCancelledError } from './errors/Errors';
import { IsolatedCircuitError } from './errors/IsolatedCircuitError';
import { circuitBreaker, handleAll, handleType } from './Policy';
import { IterableBackoff } from './backoff/IterableBackoff';

class MyException extends Error {}

Expand Down Expand Up @@ -79,6 +80,71 @@ describe('CircuitBreakerPolicy', () => {
expect(onReset).calledOnce;
});

it('uses the given backof factory to decide whether to enter the half open state', async () => {
p = circuitBreaker(handleType(MyException), {
halfOpenAfter: new IterableBackoff([1000, 2000]),
breaker: new ConsecutiveBreaker(2),
});
p.onReset(onReset);
p.onHalfOpen(onHalfOpen);

await openBreaker();

clock.tick(1000);

const failedAttempt = p.execute(stub().throws(new MyException()));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledOnce;
await expect(failedAttempt).to.be.rejectedWith(MyException);
expect(p.state).to.equal(CircuitState.Open);

clock.tick(1000);

await expect(p.execute(stub().throws(new MyException()))).to.be.rejectedWith(
BrokenCircuitError,
);

clock.tick(1000);

const result = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledTwice;
expect(await result).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledOnce;
});

it('resets the backoff when closing the circuit', async () => {
p = circuitBreaker(handleType(MyException), {
halfOpenAfter: new IterableBackoff([1000, 2000]),
breaker: new ConsecutiveBreaker(2),
});
p.onReset(onReset);
p.onHalfOpen(onHalfOpen);

await openBreaker();

clock.tick(1000);

const halfOpenTest1 = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledOnce;
expect(await halfOpenTest1).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledOnce;

await openBreaker();

clock.tick(1000);

const halfOpenTest2 = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledTwice;
expect(await halfOpenTest2).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledTwice;
});

it('dedupes half-open tests', async () => {
await openBreaker();
clock.tick(1000);
Expand Down
48 changes: 42 additions & 6 deletions src/CircuitBreakerPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ConstantBackoff, IBackoff, IBackoffFactory } from './backoff/Backoff';
import { IBreaker } from './breaker/Breaker';
import { neverAbortedSignal } from './common/abort';
import { EventEmitter } from './common/Event';
Expand Down Expand Up @@ -30,9 +31,31 @@ export enum CircuitState {
Isolated,
}

/**
* Context passed into halfOpenAfter backoff delegate.
*/
export interface IHalfOpenAfterBackoffContext extends IDefaultPolicyContext {
/**
* The consecutive number of times the circuit has entered the
* {@link CircuitState.Open} state.
*/
attempt: number;
/**
* The result of the last method call that caused the circuit to enter the
* {@link CircuitState.Open} state. Either a thrown error, or a value that we
* determined should open the circuit.
*/
result: FailureReason<unknown>;
}

export interface ICircuitBreakerOptions {
breaker: IBreaker;
halfOpenAfter: number;
/**
* When to (potentially) enter the {@link CircuitState.HalfOpen} state from
* the {@link CircuitState.Open} state. Either a duration in milliseconds or a
* backoff factory.
*/
halfOpenAfter: number | IBackoffFactory<IHalfOpenAfterBackoffContext>;
}

type InnerState =
Expand All @@ -48,8 +71,11 @@ export class CircuitBreakerPolicy implements IPolicy {
private readonly resetEmitter = new EventEmitter<void>();
private readonly halfOpenEmitter = new EventEmitter<void>();
private readonly stateChangeEmitter = new EventEmitter<CircuitState>();
private readonly halfOpenAfterBackoffFactory: IBackoffFactory<IHalfOpenAfterBackoffContext>;
private innerLastFailure?: FailureReason<unknown>;
private innerState: InnerState = { value: CircuitState.Closed };
private openEnteredCount = 0;
private halfOpenAfterBackof: IBackoff<IHalfOpenAfterBackoffContext> | undefined;

/**
* Event emitted when the circuit breaker opens.
Expand Down Expand Up @@ -99,7 +125,12 @@ export class CircuitBreakerPolicy implements IPolicy {
constructor(
private readonly options: ICircuitBreakerOptions,
private readonly executor: ExecuteWrapper,
) {}
) {
this.halfOpenAfterBackoffFactory =
typeof options.halfOpenAfter === 'number'
? new ConstantBackoff(options.halfOpenAfter)
: options.halfOpenAfter;
}

/**
* Manually holds open the circuit breaker.
Expand Down Expand Up @@ -152,7 +183,7 @@ export class CircuitBreakerPolicy implements IPolicy {
} else {
this.innerLastFailure = result;
if (this.options.breaker.failure(state.value)) {
this.open(result);
this.open(result, signal);
}
}

Expand All @@ -167,7 +198,7 @@ export class CircuitBreakerPolicy implements IPolicy {
return this.execute(fn);

case CircuitState.Open:
if (Date.now() - state.openedAt < this.options.halfOpenAfter) {
if (Date.now() - state.openedAt < (this.halfOpenAfterBackof?.duration ?? 0)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The halfOpenAfterBackoff should be stored in the state to avoid the null assertion (which should never happen)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I didn't think of that.

throw new BrokenCircuitError();
}
const test = this.halfOpen(fn, signal);
Expand Down Expand Up @@ -197,7 +228,7 @@ export class CircuitBreakerPolicy implements IPolicy {
} else {
this.innerLastFailure = result;
this.options.breaker.failure(CircuitState.HalfOpen);
this.open(result);
this.open(result, signal);
}

return returnOrThrow(result);
Expand All @@ -209,21 +240,26 @@ export class CircuitBreakerPolicy implements IPolicy {
}
}

private open(reason: FailureReason<unknown>) {
private open(reason: FailureReason<unknown>, signal: AbortSignal) {
if (this.state === CircuitState.Isolated || this.state === CircuitState.Open) {
return;
}

this.innerState = { value: CircuitState.Open, openedAt: Date.now() };
this.breakEmitter.emit(reason);
this.stateChangeEmitter.emit(CircuitState.Open);
const context = { attempt: ++this.openEnteredCount, result: reason, signal };
this.halfOpenAfterBackof =
this.halfOpenAfterBackof?.next(context) ?? this.halfOpenAfterBackoffFactory.next(context);
}

private close() {
if (this.state === CircuitState.HalfOpen) {
this.innerState = { value: CircuitState.Closed };
this.resetEmitter.emit();
this.stateChangeEmitter.emit(CircuitState.Closed);
this.halfOpenAfterBackof = undefined;
this.openEnteredCount = 0;
}
}
}
10 changes: 8 additions & 2 deletions src/Policy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ConstantBackoff, IBackoffFactory } from './backoff/Backoff';
import { IBreaker } from './breaker/Breaker';
import { BulkheadPolicy } from './BulkheadPolicy';
import { CircuitBreakerPolicy } from './CircuitBreakerPolicy';
import { CircuitBreakerPolicy, IHalfOpenAfterBackoffContext } from './CircuitBreakerPolicy';
import { Event } from './common/Event';
import { ExecuteWrapper } from './common/Executor';
import { FallbackPolicy } from './FallbackPolicy';
Expand Down Expand Up @@ -450,7 +450,13 @@ export function retry(
* @param breaker The circuit breaker to use. This package exports
* ConsecutiveBreaker and SamplingBreakers for you to use.
*/
export function circuitBreaker(policy: Policy, opts: { halfOpenAfter: number; breaker: IBreaker }) {
export function circuitBreaker(
policy: Policy,
opts: {
halfOpenAfter: number | IBackoffFactory<IHalfOpenAfterBackoffContext>;
breaker: IBreaker;
},
) {
return new CircuitBreakerPolicy(
opts,
new ExecuteWrapper(policy.options.errorFilter, policy.options.resultFilter),
Expand Down