Skip to content

Commit

Permalink
Use new leader election mechanism in applier manager
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Sep 4, 2024
1 parent 173ed0a commit d86e318
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 21 deletions.
35 changes: 17 additions & 18 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,26 @@ func (m *Manager) Init(ctx context.Context) error {
m.log = logrus.WithField("component", constant.ApplierManagerComponentName)
m.bundleDir = m.K0sVars.ManifestsDir

m.LeaderElector.AddAcquiredLeaseCallback(func() {
ctx, cancel := context.WithCancelCause(ctx)
stopped := make(chan struct{})

m.stop = func(reason string) { cancel(errors.New(reason)); <-stopped }
go func() {
defer close(stopped)
wait.UntilWithContext(ctx, m.runWatchers, 1*time.Minute)
}()
})
m.LeaderElector.AddLostLeaseCallback(func() {
if m.stop != nil {
m.stop("lost leadership")
}
})

return err
return nil
}

// Run runs the Manager
func (m *Manager) Start(_ context.Context) error {
func (m *Manager) Start(context.Context) error {
ctx, cancel := context.WithCancelCause(context.Background())
stopped := make(chan struct{})

m.stop = func(reason string) {
cancel(errors.New(reason))
<-stopped
}

go func() {
defer close(stopped)
leaderelector.RunLeaderTasks(ctx, m.LeaderElector.GetLeaderStatus, func(ctx context.Context) {
wait.UntilWithContext(ctx, m.runWatchers, time.Minute)
})
}()

return nil
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/component/controller/leaderelector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/leaderelection"
)

type Dummy struct {
Expand All @@ -38,6 +39,17 @@ func (l *Dummy) AddAcquiredLeaseCallback(fn func()) {
l.callbacks = append(l.callbacks, fn)
}

var never = make(<-chan struct{})

func (l *Dummy) GetLeaderStatus() (leaderelection.Status, <-chan struct{}) {
var status leaderelection.Status
if l.Leader {
status = leaderelection.StatusLeading
}

return status, never
}

func (l *Dummy) AddLostLeaseCallback(func()) {}

func (l *Dummy) Start(_ context.Context) error {
Expand Down
10 changes: 7 additions & 3 deletions pkg/component/controller/leaderelector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ limitations under the License.

package leaderelector

import "github.com/k0sproject/k0s/pkg/leaderelection"

// Interface is the common leader elector component to manage each controller leader status.
type Interface interface {
IsLeader() bool
AddAcquiredLeaseCallback(fn func())
AddLostLeaseCallback(fn func())
IsLeader() bool // Deprecated: Use [Interface.GetLeaderStatus] instead.
AddAcquiredLeaseCallback(fn func()) // Deprecated: Use [Interface.GetLeaderStatus] instead.
AddLostLeaseCallback(fn func()) // Deprecated: Use [Interface.GetLeaderStatus] instead.

GetLeaderStatus() (leaderelection.Status, <-chan struct{})
}
5 changes: 5 additions & 0 deletions pkg/component/controller/workerconfig/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/k0sproject/k0s/pkg/config"
"github.com/k0sproject/k0s/pkg/constant"
kube "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/k0sproject/k0s/pkg/leaderelection"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -913,3 +914,7 @@ func (e *mockLeaderElector) AddAcquiredLeaseCallback(fn func()) {
func (e *mockLeaderElector) AddLostLeaseCallback(func()) {
panic("not expected to be called in tests")
}

func (e *mockLeaderElector) GetLeaderStatus() (leaderelection.Status, <-chan struct{}) {
panic("not expected to be called in tests")
}

0 comments on commit d86e318

Please sign in to comment.