From eddac735a1e2ad074789d449b291c8e31fb3bb68 Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Wed, 9 Oct 2024 15:15:28 +0100 Subject: [PATCH] Attempt publication with circuit breaker GPBFT presciently ignores broadcast and rebroadcast requests as it is beyond its boundaries of responsibility to do something about. In practice such failures may be a sign that pubsub is overwhelmed with messages. Therefore, ideally the system should avoid aggravating the situation by requesting further broadcasts. This is specially important in re-broadcast requests because it often involves batch message publication. The changes here wrap the pubsub publication calls with a circuit breaker that will open on consecutive errors (set to `5`), and will not attempt to actually publish messages until a reset timeout (set to `3s`) has passed. Fixes #632 --- host.go | 16 ++- internal/circuitbreaker/circuitbreaker.go | 109 ++++++++++++++++++ .../circuitbreaker/circuitbreaker_test.go | 102 ++++++++++++++++ 3 files changed, 225 insertions(+), 2 deletions(-) create mode 100644 internal/circuitbreaker/circuitbreaker.go create mode 100644 internal/circuitbreaker/circuitbreaker_test.go diff --git a/host.go b/host.go index ecb08c27..885a958f 100644 --- a/host.go +++ b/host.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/go-f3/certstore" "github.com/filecoin-project/go-f3/ec" "github.com/filecoin-project/go-f3/gpbft" + "github.com/filecoin-project/go-f3/internal/circuitbreaker" "github.com/filecoin-project/go-f3/internal/clock" "github.com/filecoin-project/go-f3/internal/psutil" "github.com/filecoin-project/go-f3/internal/writeaheadlog" @@ -41,6 +42,7 @@ type gpbftRunner struct { participant *gpbft.Participant topic *pubsub.Topic + cb *circuitbreaker.CircuitBreaker alertTimer *clock.Timer @@ -86,6 +88,7 @@ func newRunner( ctxCancel: ctxCancel, equivFilter: newEquivocationFilter(pID), selfMessages: make(map[uint64]map[roundPhase][]*gpbft.GMessage), + cb: circuitbreaker.New(5, 3*time.Second), } // create a stopped timer to facilitate alerts requested from gpbft @@ -444,7 +447,7 @@ func (h *gpbftRunner) BroadcastMessage(ctx context.Context, msg *gpbft.GMessage) return fmt.Errorf("marshalling GMessage for broadcast: %w", err) } - err = h.topic.Publish(ctx, bw.Bytes()) + err = h.publishWithCircuitBreaker(ctx, bw.Bytes()) if err != nil { return fmt.Errorf("publishing message: %w", err) } @@ -463,12 +466,21 @@ func (h *gpbftRunner) rebroadcastMessage(msg *gpbft.GMessage) error { if err := msg.MarshalCBOR(&bw); err != nil { return fmt.Errorf("marshalling GMessage for broadcast: %w", err) } - if err := h.topic.Publish(h.runningCtx, bw.Bytes()); err != nil { + if err := h.publishWithCircuitBreaker(h.runningCtx, bw.Bytes()); err != nil { return fmt.Errorf("publishing message: %w", err) } return nil } +func (h *gpbftRunner) publishWithCircuitBreaker(ctx context.Context, msg []byte) error { + if h.topic == nil { + return nil + } + return h.cb.Run(func() error { + return h.topic.Publish(ctx, msg) + }) +} + var _ pubsub.ValidatorEx = (*gpbftRunner)(nil).validatePubsubMessage func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg *pubsub.Message) (_result pubsub.ValidationResult) { diff --git a/internal/circuitbreaker/circuitbreaker.go b/internal/circuitbreaker/circuitbreaker.go new file mode 100644 index 00000000..33452d9f --- /dev/null +++ b/internal/circuitbreaker/circuitbreaker.go @@ -0,0 +1,109 @@ +package circuitbreaker + +import ( + "errors" + "fmt" + "sync" + "time" +) + +const ( + Closed Status = iota + Open + HalfOpen +) + +// ErrOpen signals that the circuit is open. See CircuitBreaker.Run. +var ErrOpen = errors.New("circuit breaker is open") + +type Status int + +type CircuitBreaker struct { + maxFailures int + resetTimeout time.Duration + + // mu guards access to status, lastFailure and failures. + mu sync.Mutex + failures int + lastFailure time.Time + status Status +} + +// New creates a new CircuitBreaker instance with the specified maximum number +// of failures and a reset timeout duration. +// +// See CircuitBreaker.Run. +func New(maxFailures int, resetTimeout time.Duration) *CircuitBreaker { + return &CircuitBreaker{ + maxFailures: maxFailures, + resetTimeout: resetTimeout, + } +} + +// Run attempts to execute the provided function within the context of the +// circuit breaker. It handles state transitions, Closed, Open, or HalfOpen, +// based on the outcome of the attempt. +// +// If the circuit is in the Open state, and not enough time has passed since the +// last failure, the circuit remains open, and the function returns +// ErrOpen without attempting the provided function. If enough time +// has passed, the circuit transitions to HalfOpen, and one attempt is allowed. +// +// In HalfOpen state if the function is executed and returns an error, the +// circuit breaker will transition back to Open status. Otherwise, if the +// function executes successfully, the circuit resets to the Closed state, and +// the failure count is reset to zero. +// +// Example: +// +// cb := NewCircuitBreaker(3, time.Second) +// switch err := cb.Run(func() error { +// // Your attempt logic here +// return nil +// }); { +// case errors.Is(err, ErrCircuitBreakerOpen): +// // No execution attempt was made since the circuit is open. +// case err != nil: +// // Execution attempt failed. +// default: +// // Execution attempt succeeded. +// } +func (cb *CircuitBreaker) Run(attempt func() error) error { + cb.mu.Lock() + defer cb.mu.Unlock() + switch cb.status { + case Open: + if time.Since(cb.lastFailure) < cb.resetTimeout { + // Not enough time has passed since the circuit opened. Do not make any further + // attempts. + return ErrOpen + } + // Enough time has passed since last failure. Proceed to allow one attempt by + // half-opening the circuit. + cb.status = HalfOpen + fallthrough + case HalfOpen, Closed: + if err := attempt(); err != nil { + cb.failures++ + if cb.failures >= cb.maxFailures { + // Trip the circuit as we are at or above the max failure threshold. + cb.status = Open + cb.lastFailure = time.Now() + } + return err + } + // Reset the circuit since the attempt succeeded. + cb.status = Closed + cb.failures = 0 + return nil + default: + return fmt.Errorf("unknown status: %d", cb.status) + } +} + +// GetStatus returns the current status of the CircuitBreaker. +func (cb *CircuitBreaker) GetStatus() Status { + cb.mu.Lock() + defer cb.mu.Unlock() + return cb.status +} diff --git a/internal/circuitbreaker/circuitbreaker_test.go b/internal/circuitbreaker/circuitbreaker_test.go new file mode 100644 index 00000000..b785bc5a --- /dev/null +++ b/internal/circuitbreaker/circuitbreaker_test.go @@ -0,0 +1,102 @@ +package circuitbreaker_test + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/filecoin-project/go-f3/internal/circuitbreaker" + "github.com/stretchr/testify/require" +) + +func TestCircuitBreaker(t *testing.T) { + t.Parallel() + + const ( + maxFailures = 3 + restTimeout = 10 * time.Millisecond + + eventualTimeout = restTimeout * 2 + eventualTick = restTimeout / 5 + ) + + var ( + failure = errors.New("fish out of water") + + succeed = func() error { return nil } + fail = func() error { return failure } + trip = func(t *testing.T, subject *circuitbreaker.CircuitBreaker) { + for range maxFailures { + require.ErrorContains(t, subject.Run(fail), "fish") + } + require.Equal(t, circuitbreaker.Open, subject.GetStatus()) + } + ) + + t.Run("closed on no error", func(t *testing.T) { + t.Parallel() + subject := circuitbreaker.New(maxFailures, restTimeout) + require.NoError(t, subject.Run(succeed)) + require.Equal(t, circuitbreaker.Closed, subject.GetStatus()) + }) + + t.Run("opens after max failures and stays open", func(t *testing.T) { + subject := circuitbreaker.New(maxFailures, restTimeout) + trip(t, subject) + + // Assert that immediate runs fail, without being attempted, even if they would + // be successful until restTimeout has elapsed. + err := subject.Run(succeed) + require.ErrorIs(t, err, circuitbreaker.ErrOpen) + require.Equal(t, circuitbreaker.Open, subject.GetStatus()) + }) + + t.Run("half-opens eventually", func(t *testing.T) { + subject := circuitbreaker.New(maxFailures, restTimeout) + trip(t, subject) + require.ErrorIs(t, subject.Run(fail), circuitbreaker.ErrOpen) + // Assert that given function is eventually run after circuit is tripped at + // half-open status by checking error type. + require.Eventually(t, func() bool { return errors.Is(subject.Run(fail), failure) }, eventualTimeout, eventualTick) + }) + + t.Run("closes after rest timeout and success", func(t *testing.T) { + subject := circuitbreaker.New(maxFailures, restTimeout) + trip(t, subject) + + require.Eventually(t, func() bool { return subject.Run(succeed) == nil }, eventualTimeout, eventualTick) + require.Equal(t, circuitbreaker.Closed, subject.GetStatus()) + }) + + t.Run("usable concurrently", func(t *testing.T) { + subject := circuitbreaker.New(maxFailures, restTimeout) + const ( + wantSuccesses = 7 + totalAttempts = 1_000 + ) + var ( + successes, failures int + wg sync.WaitGroup + ) + for range totalAttempts { + wg.Add(1) + go func() { + defer wg.Done() + _ = subject.Run(func() error { + // Unsafely increment/decrement counters so that if Run is not synchronised + // properly the test creates a race condition. + if successes < wantSuccesses { + successes++ + return nil + } + failures++ + return errors.New("error") + }) + }() + } + wg.Wait() + require.Equal(t, wantSuccesses, successes) + require.Equal(t, maxFailures, failures) + }) +}