Skip to content

Commit

Permalink
Add controller lease counter component after API server
Browse files Browse the repository at this point in the history
The controller lease counter requires the API server to be up and
running in order to work properly. It cannot acquire or count leases
before the API server is up, nor can it release the lease after the API
server is shut down.

Move the counter component down in the controller start method, so
that it's started after the API server. Decouple the actual controller
count value from the counter component by putting it directly into the
start method. Let the counter component update that value, and let the
konnectivity components consume those updates.

Introduce internal/sync.Latest as a small abstraction for values that
may change over time and will be consumed by multiple observers.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Jul 9, 2024
1 parent 9e99b08 commit f3a2b73
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 149 deletions.
37 changes: 20 additions & 17 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/k0sproject/k0s/internal/pkg/file"
k0slog "github.com/k0sproject/k0s/internal/pkg/log"
"github.com/k0sproject/k0s/internal/pkg/sysinfo"
"github.com/k0sproject/k0s/internal/sync/value"
"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/applier"
apclient "github.com/k0sproject/k0s/pkg/autopilot/client"
Expand Down Expand Up @@ -215,15 +216,8 @@ func (c *command) start(ctx context.Context) error {
logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type)
nodeComponents.Add(ctx, storageBackend)

controllerLeaseCounter := &controller.K0sControllersLeaseCounter{
InvocationID: c.K0sVars.InvocationID,
ClusterConfig: nodeConfig,
KubeClientFactory: adminClientFactory,
}

if !c.SingleNode {
nodeComponents.Add(ctx, controllerLeaseCounter)
}
// Assume a single active controller during startup
numActiveControllers := value.NewLatest[uint](1)

if cplb := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplb != nil && cplb.Enabled {
if c.SingleNode {
Expand All @@ -246,10 +240,10 @@ func (c *command) start(ctx context.Context) error {

if enableKonnectivity {
nodeComponents.Add(ctx, &controller.Konnectivity{
K0sVars: c.K0sVars,
LogLevel: c.LogLevels.Konnectivity,
EventEmitter: prober.NewEventEmitter(),
K0sControllersLeaseCounter: controllerLeaseCounter,
K0sVars: c.K0sVars,
LogLevel: c.LogLevels.Konnectivity,
EventEmitter: prober.NewEventEmitter(),
ServerCount: numActiveControllers.Peek,
})
}

Expand All @@ -262,6 +256,15 @@ func (c *command) start(ctx context.Context) error {
DisableEndpointReconciler: disableEndpointReconciler,
})

if !c.SingleNode {
nodeComponents.Add(ctx, &controller.K0sControllersLeaseCounter{
InvocationID: c.K0sVars.InvocationID,
ClusterConfig: nodeConfig,
KubeClientFactory: adminClientFactory,
UpdateControllerCount: numActiveControllers.Set,
})
}

var leaderElector interface {
leaderelector.Interface
manager.Component
Expand Down Expand Up @@ -514,10 +517,10 @@ func (c *command) start(ctx context.Context) error {

if enableKonnectivity {
clusterComponents.Add(ctx, &controller.KonnectivityAgent{
K0sVars: c.K0sVars,
APIServerHost: nodeConfig.Spec.API.APIAddress(),
EventEmitter: prober.NewEventEmitter(),
K0sControllersLeaseCounter: controllerLeaseCounter,
K0sVars: c.K0sVars,
APIServerHost: nodeConfig.Spec.API.APIAddress(),
EventEmitter: prober.NewEventEmitter(),
ServerCount: numActiveControllers.Peek,
})
}

Expand Down
109 changes: 109 additions & 0 deletions internal/sync/value/latest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2024 k0s authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package value

import "sync/atomic"

// A value that can be atomically updated, where each update invalidates the
// previous value. Whenever the value changes, an associated expiration channel
// is closed. Callers can use this to be notified of updates. The zero value of
// Latest holds a zero value of T.
//
// Latest is useful when some shared state is updated frequently, and readers
// don't need to keep track of every value, just the latest one. Latest makes
// this easy, as there's no need to maintain a separate channel for each reader.
//
// Example Usage:
//
// package main
//
// import (
// "fmt"
// "sync"
// "time"
//
// "github.com/k0sproject/k0s/internal/sync/value"
// )
//
// func main() {
// // Declare a zero latest value
// var l value.Latest[int]
//
// fmt.Println("Zero value:", l.Get()) // Output: Zero value: 0
//
// // Set the value
// l.Set(42)
// fmt.Println("Value set to 42")
//
// // Peek at the current value and get the expiration channel
// value, expired := l.Peek()
// fmt.Println("Peeked value:", value) // Output: Peeked value: 42
//
// // Use a goroutine to watch for expiration
// var wg sync.WaitGroup
// wg.Add(1)
// go func() {
// defer wg.Done()
// <-expired
// fmt.Println("Value expired, new value:", l.Get()) // Output: Value expired, new value: 84
// }()
//
// // Set a new value, which will expire the previous value
// time.Sleep(1 * time.Second) // Simulate some delay
// l.Set(84)
// fmt.Println("New value set to 84")
//
// wg.Wait() // Wait for the watcher goroutine to finish
// }
type Latest[T any] struct {
p atomic.Pointer[val[T]]
}

func NewLatest[T any](value T) *Latest[T] {
latest := new(Latest[T])
latest.Set(value)
return latest
}

// Retrieves the latest value and its associated expiration channel. If no value
// was previously set, it returns the zero value of T and an expiration channel
// that is closed as soon as a value is set.
func (l *Latest[T]) Peek() (T, <-chan struct{}) {
if loaded := l.p.Load(); loaded != nil {
return loaded.inner, loaded.ch
}

value := val[T]{ch: make(chan struct{})}
if !l.p.CompareAndSwap(nil, &value) {
loaded := l.p.Load()
return loaded.inner, loaded.ch
}

return value.inner, value.ch
}

// Sets a new value and closes the expiration channel of the previous value.
func (l *Latest[T]) Set(value T) {
if old := l.p.Swap(&val[T]{value, make(chan struct{})}); old != nil {
close(old.ch)
}
}

type val[T any] struct {
inner T
ch chan struct{}
}
53 changes: 53 additions & 0 deletions internal/sync/value/latest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 k0s authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package value_test

import (
"sync"
"testing"
"time"

"github.com/k0sproject/k0s/internal/sync/value"

"github.com/stretchr/testify/assert"
)

func TestLatest(t *testing.T) {
var underTest value.Latest[int]
value, expired := underTest.Peek()
assert.Zero(t, value, "Zero latest should return zero value")
assert.NotNil(t, expired)

var got int
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-expired
got, _ = underTest.Peek()
}()

time.Sleep(10 * time.Millisecond) // Simulate some delay
underTest.Set(42)
wg.Wait()

assert.Equal(t, 42, got)

newValue, newExpired := underTest.Peek()
assert.Equal(t, 42, newValue)
assert.NotEqual(t, expired, newExpired)
}
4 changes: 2 additions & 2 deletions pkg/autopilot/controller/updates/clusterinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type ClusterInfo struct {
K0sVersion string
StorageType string
ClusterID string
ControlPlaneNodesCount int
ControlPlaneNodesCount uint
WorkerData WorkerData
CNIProvider string
Arch string
Expand Down Expand Up @@ -121,7 +121,7 @@ func CollectData(ctx context.Context, kc kubernetes.Interface) (*ClusterInfo, er
}

// Collect control plane node count
ci.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, kc)
ci.ControlPlaneNodesCount, err = kubeutil.CountActiveControllerLeases(ctx, kc)
if err != nil {
return ci, fmt.Errorf("can't collect control plane nodes count: %w", err)
}
Expand Down
74 changes: 21 additions & 53 deletions pkg/component/controller/controllersleasecounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ import (
"github.com/k0sproject/k0s/pkg/leaderelection"
"github.com/k0sproject/k0s/pkg/node"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/sirupsen/logrus"
)

// K0sControllersLeaseCounter implements a component that manages a lease per controller.
// The per-controller leases are used to determine the amount of currently running controllers
type K0sControllersLeaseCounter struct {
InvocationID string
ClusterConfig *v1beta1.ClusterConfig
KubeClientFactory kubeutil.ClientFactoryInterface
InvocationID string
ClusterConfig *v1beta1.ClusterConfig
KubeClientFactory kubeutil.ClientFactoryInterface
UpdateControllerCount func(uint)

cancelFunc context.CancelFunc

subscribers []chan int
}

var _ manager.Component = (*K0sControllersLeaseCounter)(nil)

// Init initializes the component needs
func (l *K0sControllersLeaseCounter) Init(_ context.Context) error {
l.subscribers = make([]chan int, 0)

return nil
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func (l *K0sControllersLeaseCounter) Start(context.Context) error {
}
}()

go l.runLeaseCounter(ctx)
go l.runLeaseCounter(ctx, client)

return nil
}
Expand All @@ -107,53 +107,21 @@ func (l *K0sControllersLeaseCounter) Stop() error {
return nil
}

// Check the numbers of controller every 10 secs and notify the subscribers
func (l *K0sControllersLeaseCounter) runLeaseCounter(ctx context.Context) {
// Updates the number of active controller leases every 10 secs.
func (l *K0sControllersLeaseCounter) runLeaseCounter(ctx context.Context, clients kubernetes.Interface) {
log := logrus.WithFields(logrus.Fields{"component": "controllerlease"})
log.Debug("starting controller lease counter every 10 secs")
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("stopping controller lease counter")
log.Debug("Starting controller lease counter every 10 secs")

wait.UntilWithContext(ctx, func(ctx context.Context) {
log.Debug("Counting active controller leases")
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
count, err := kubeutil.CountActiveControllerLeases(ctx, clients)
if err != nil {
log.WithError(err).Error("Failed to count controller lease holders")
return
case <-ticker.C:
log.Debug("counting controller lease holders")
count, err := l.countLeaseHolders(ctx)
if err != nil {
log.Errorf("failed to count controller leases: %s", err)
}
l.notifySubscribers(count)
}
}
}

func (l *K0sControllersLeaseCounter) countLeaseHolders(ctx context.Context) (int, error) {
client, err := l.KubeClientFactory.GetClient()
if err != nil {
return 0, err
}

return kubeutil.GetControlPlaneNodeCount(ctx, client)
}

// Notify the subscribers about the current controller count
func (l *K0sControllersLeaseCounter) notifySubscribers(count int) {
log := logrus.WithFields(logrus.Fields{"component": "controllerlease"})
log.Debugf("notifying subscribers (%d) about controller count: %d", len(l.subscribers), count)
for _, ch := range l.subscribers {
// Use non-blocking send to avoid blocking the loop
select {
case ch <- count:
case <-time.After(5 * time.Second):
log.Warn("timeout when sending count to subsrciber")
}
}
}

func (l *K0sControllersLeaseCounter) Subscribe() <-chan int {
ch := make(chan int, 1)
l.subscribers = append(l.subscribers, ch)
return ch
l.UpdateControllerCount(count)
}, 10*time.Second)
}
Loading

0 comments on commit f3a2b73

Please sign in to comment.