Skip to content

Commit

Permalink
fix: proactively coordinate leader/followers (#2028)
Browse files Browse the repository at this point in the history
fixes #2011

Cause: 
- Coordinator only does it coordination logic when something calls
`Get()`
- If leader/follower coordinator's `Get()` function was not called then
an expired leader/follower would not change
- ASM follower syncs with what it expects to be the leader even if that
leader is not alive. Even if we get a new leader, the follower would not
be swapped out for a valid one until `Get()` was called. So it would
churn away failing to sync.

Fix:
- Proactively call `Get()` periodically so that coordination occurs even
if no calls to Get() would otherwise happen.
  • Loading branch information
matt2e authored Jul 10, 2024
1 parent 2e07ee4 commit 84165e7
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions backend/controller/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,31 @@ func NewCoordinator[P any](ctx context.Context,
leaderFactory: leaderFactory,
followerFactory: followerFactory,
}
// Attempt to coordinate proactively without blocking
go func() {
_, _ = coordinator.Get() //nolint:errcheck
}()
go coordinator.sync(ctx)
return coordinator
}

// sync proactively tries to coordinate between leader and followers
//
// This allows the coordinator to maintain a leader or follower even when Get() is not called.
// Otherwise we can have stale followers attempting to communicate with a leader that no longer exists, until a call to Get() comes in
func (c *Coordinator[P]) sync(ctx context.Context) {
logger := log.FromContext(ctx)
next := time.Now()
for {
select {
case <-time.After(time.Until(next)):
_, err := c.Get()
if err != nil {
logger.Errorf(err, "could not proactively coordinate leader for %s", c.key)
}
case <-ctx.Done():
return
}
next = time.Now().Add(max(time.Second*5, c.leaseTTL/2))
}
}

// Get returns either a leader or follower
func (c *Coordinator[P]) Get() (leaderOrFollower P, err error) {
// Can not have multiple Get() calls in parallel as they may conflict with each other.
Expand Down Expand Up @@ -133,7 +151,7 @@ func (c *Coordinator[P]) Get() (leaderOrFollower P, err error) {
go func() {
c.watchForLeaderExpiration(leaderCtx)
}()
logger.Tracef("new leader for %s: %s", c.key, c.advertise)
logger.Debugf("new leader for %s: %s", c.key, c.advertise)
return l, nil
}
if !errors.Is(leaseErr, leases.ErrConflict) {
Expand Down

0 comments on commit 84165e7

Please sign in to comment.