diff --git a/backend/controller/leader/leader.go b/backend/controller/leader/leader.go index 315cc0ac2..a12b6f018 100644 --- a/backend/controller/leader/leader.go +++ b/backend/controller/leader/leader.go @@ -91,13 +91,31 @@ func NewCoordinator[P any](ctx context.Context, leaderFactory: leaderFactory, followerFactory: followerFactory, } - // Attempt to coordinate proactively without blocking - go func() { - _, _ = coordinator.Get() //nolint:errcheck - }() + go coordinator.sync(ctx) return coordinator } +// sync proactively tries to coordinate between leader and followers +// +// This allows the coordinator to maintain a leader or follower even when Get() is not called. +// Otherwise we can have stale followers attempting to communicate with a leader that no longer exists, until a call to Get() comes in +func (c *Coordinator[P]) sync(ctx context.Context) { + logger := log.FromContext(ctx) + next := time.Now() + for { + select { + case <-time.After(time.Until(next)): + _, err := c.Get() + if err != nil { + logger.Errorf(err, "could not proactively coordinate leader for %s", c.key) + } + case <-ctx.Done(): + return + } + next = time.Now().Add(max(time.Second*5, c.leaseTTL/2)) + } +} + // Get returns either a leader or follower func (c *Coordinator[P]) Get() (leaderOrFollower P, err error) { // Can not have multiple Get() calls in parallel as they may conflict with each other. @@ -133,7 +151,7 @@ func (c *Coordinator[P]) Get() (leaderOrFollower P, err error) { go func() { c.watchForLeaderExpiration(leaderCtx) }() - logger.Tracef("new leader for %s: %s", c.key, c.advertise) + logger.Debugf("new leader for %s: %s", c.key, c.advertise) return l, nil } if !errors.Is(leaseErr, leases.ErrConflict) {