diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index e6c44164d494..68b4255f3d46 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -35,6 +35,7 @@ import ( "github.com/k0sproject/k0s/internal/pkg/file" k0slog "github.com/k0sproject/k0s/internal/pkg/log" "github.com/k0sproject/k0s/internal/pkg/sysinfo" + "github.com/k0sproject/k0s/internal/sync/value" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/applier" apclient "github.com/k0sproject/k0s/pkg/autopilot/client" @@ -215,15 +216,8 @@ func (c *command) start(ctx context.Context) error { logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type) nodeComponents.Add(ctx, storageBackend) - controllerLeaseCounter := &controller.K0sControllersLeaseCounter{ - InvocationID: c.K0sVars.InvocationID, - ClusterConfig: nodeConfig, - KubeClientFactory: adminClientFactory, - } - - if !c.SingleNode { - nodeComponents.Add(ctx, controllerLeaseCounter) - } + // Assume a single active controller during startup + numActiveControllers := value.NewLatest[uint](1) if cplb := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplb != nil && cplb.Enabled { if c.SingleNode { @@ -246,10 +240,10 @@ func (c *command) start(ctx context.Context) error { if enableKonnectivity { nodeComponents.Add(ctx, &controller.Konnectivity{ - K0sVars: c.K0sVars, - LogLevel: c.LogLevels.Konnectivity, - EventEmitter: prober.NewEventEmitter(), - K0sControllersLeaseCounter: controllerLeaseCounter, + K0sVars: c.K0sVars, + LogLevel: c.LogLevels.Konnectivity, + EventEmitter: prober.NewEventEmitter(), + ServerCount: numActiveControllers.Peek, }) } @@ -262,6 +256,15 @@ func (c *command) start(ctx context.Context) error { DisableEndpointReconciler: disableEndpointReconciler, }) + if !c.SingleNode { + nodeComponents.Add(ctx, &controller.K0sControllersLeaseCounter{ + InvocationID: c.K0sVars.InvocationID, + ClusterConfig: nodeConfig, + KubeClientFactory: adminClientFactory, + UpdateControllerCount: numActiveControllers.Set, + }) + } + var leaderElector interface { leaderelector.Interface manager.Component @@ -514,10 +517,10 @@ func (c *command) start(ctx context.Context) error { if enableKonnectivity { clusterComponents.Add(ctx, &controller.KonnectivityAgent{ - K0sVars: c.K0sVars, - APIServerHost: nodeConfig.Spec.API.APIAddress(), - EventEmitter: prober.NewEventEmitter(), - K0sControllersLeaseCounter: controllerLeaseCounter, + K0sVars: c.K0sVars, + APIServerHost: nodeConfig.Spec.API.APIAddress(), + EventEmitter: prober.NewEventEmitter(), + ServerCount: numActiveControllers.Peek, }) } diff --git a/internal/sync/value/latest.go b/internal/sync/value/latest.go new file mode 100644 index 000000000000..406f90769dc1 --- /dev/null +++ b/internal/sync/value/latest.go @@ -0,0 +1,109 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package value + +import "sync/atomic" + +// A value that can be atomically updated, where each update invalidates the +// previous value. Whenever the value changes, an associated expiration channel +// is closed. Callers can use this to be notified of updates. The zero value of +// Latest holds a zero value of T. +// +// Latest is useful when some shared state is updated frequently, and readers +// don't need to keep track of every value, just the latest one. Latest makes +// this easy, as there's no need to maintain a separate channel for each reader. +// +// Example Usage: +// +// package main +// +// import ( +// "fmt" +// "sync" +// "time" +// +// "github.com/k0sproject/k0s/internal/sync/value" +// ) +// +// func main() { +// // Declare a zero latest value +// var l value.Latest[int] +// +// fmt.Println("Zero value:", l.Get()) // Output: Zero value: 0 +// +// // Set the value +// l.Set(42) +// fmt.Println("Value set to 42") +// +// // Peek at the current value and get the expiration channel +// value, expired := l.Peek() +// fmt.Println("Peeked value:", value) // Output: Peeked value: 42 +// +// // Use a goroutine to watch for expiration +// var wg sync.WaitGroup +// wg.Add(1) +// go func() { +// defer wg.Done() +// <-expired +// fmt.Println("Value expired, new value:", l.Get()) // Output: Value expired, new value: 84 +// }() +// +// // Set a new value, which will expire the previous value +// time.Sleep(1 * time.Second) // Simulate some delay +// l.Set(84) +// fmt.Println("New value set to 84") +// +// wg.Wait() // Wait for the watcher goroutine to finish +// } +type Latest[T any] struct { + p atomic.Pointer[val[T]] +} + +func NewLatest[T any](value T) *Latest[T] { + latest := new(Latest[T]) + latest.Set(value) + return latest +} + +// Retrieves the latest value and its associated expiration channel. If no value +// was previously set, it returns the zero value of T and an expiration channel +// that is closed as soon as a value is set. +func (l *Latest[T]) Peek() (T, <-chan struct{}) { + if loaded := l.p.Load(); loaded != nil { + return loaded.inner, loaded.ch + } + + value := val[T]{ch: make(chan struct{})} + if !l.p.CompareAndSwap(nil, &value) { + loaded := l.p.Load() + return loaded.inner, loaded.ch + } + + return value.inner, value.ch +} + +// Sets a new value and closes the expiration channel of the previous value. +func (l *Latest[T]) Set(value T) { + if old := l.p.Swap(&val[T]{value, make(chan struct{})}); old != nil { + close(old.ch) + } +} + +type val[T any] struct { + inner T + ch chan struct{} +} diff --git a/internal/sync/value/latest_test.go b/internal/sync/value/latest_test.go new file mode 100644 index 000000000000..5f98d3517a27 --- /dev/null +++ b/internal/sync/value/latest_test.go @@ -0,0 +1,53 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package value_test + +import ( + "sync" + "testing" + "time" + + "github.com/k0sproject/k0s/internal/sync/value" + + "github.com/stretchr/testify/assert" +) + +func TestLatest(t *testing.T) { + var underTest value.Latest[int] + value, expired := underTest.Peek() + assert.Zero(t, value, "Zero latest should return zero value") + assert.NotNil(t, expired) + + var got int + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-expired + got, _ = underTest.Peek() + }() + + time.Sleep(10 * time.Millisecond) // Simulate some delay + underTest.Set(42) + wg.Wait() + + assert.Equal(t, 42, got) + + newValue, newExpired := underTest.Peek() + assert.Equal(t, 42, newValue) + assert.NotEqual(t, expired, newExpired) +} diff --git a/pkg/autopilot/controller/updates/clusterinfo.go b/pkg/autopilot/controller/updates/clusterinfo.go index 968a65cb2ba2..590f7957a731 100644 --- a/pkg/autopilot/controller/updates/clusterinfo.go +++ b/pkg/autopilot/controller/updates/clusterinfo.go @@ -36,7 +36,7 @@ type ClusterInfo struct { K0sVersion string StorageType string ClusterID string - ControlPlaneNodesCount int + ControlPlaneNodesCount uint WorkerData WorkerData CNIProvider string Arch string @@ -121,7 +121,7 @@ func CollectData(ctx context.Context, kc kubernetes.Interface) (*ClusterInfo, er } // Collect control plane node count - ci.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, kc) + ci.ControlPlaneNodesCount, err = kubeutil.CountActiveControllerLeases(ctx, kc) if err != nil { return ci, fmt.Errorf("can't collect control plane nodes count: %w", err) } diff --git a/pkg/component/controller/controllersleasecounter.go b/pkg/component/controller/controllersleasecounter.go index 4bcbed2ce6f4..41f24c6365ad 100644 --- a/pkg/component/controller/controllersleasecounter.go +++ b/pkg/component/controller/controllersleasecounter.go @@ -27,27 +27,27 @@ import ( "github.com/k0sproject/k0s/pkg/leaderelection" "github.com/k0sproject/k0s/pkg/node" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "github.com/sirupsen/logrus" ) // K0sControllersLeaseCounter implements a component that manages a lease per controller. // The per-controller leases are used to determine the amount of currently running controllers type K0sControllersLeaseCounter struct { - InvocationID string - ClusterConfig *v1beta1.ClusterConfig - KubeClientFactory kubeutil.ClientFactoryInterface + InvocationID string + ClusterConfig *v1beta1.ClusterConfig + KubeClientFactory kubeutil.ClientFactoryInterface + UpdateControllerCount func(uint) cancelFunc context.CancelFunc - - subscribers []chan int } var _ manager.Component = (*K0sControllersLeaseCounter)(nil) // Init initializes the component needs func (l *K0sControllersLeaseCounter) Init(_ context.Context) error { - l.subscribers = make([]chan int, 0) - return nil } @@ -94,7 +94,7 @@ func (l *K0sControllersLeaseCounter) Start(context.Context) error { } }() - go l.runLeaseCounter(ctx) + go l.runLeaseCounter(ctx, client) return nil } @@ -107,53 +107,21 @@ func (l *K0sControllersLeaseCounter) Stop() error { return nil } -// Check the numbers of controller every 10 secs and notify the subscribers -func (l *K0sControllersLeaseCounter) runLeaseCounter(ctx context.Context) { +// Updates the number of active controller leases every 10 secs. +func (l *K0sControllersLeaseCounter) runLeaseCounter(ctx context.Context, clients kubernetes.Interface) { log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) - log.Debug("starting controller lease counter every 10 secs") - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - log.Info("stopping controller lease counter") + log.Debug("Starting controller lease counter every 10 secs") + + wait.UntilWithContext(ctx, func(ctx context.Context) { + log.Debug("Counting active controller leases") + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + count, err := kubeutil.CountActiveControllerLeases(ctx, clients) + if err != nil { + log.WithError(err).Error("Failed to count controller lease holders") return - case <-ticker.C: - log.Debug("counting controller lease holders") - count, err := l.countLeaseHolders(ctx) - if err != nil { - log.Errorf("failed to count controller leases: %s", err) - } - l.notifySubscribers(count) } - } -} - -func (l *K0sControllersLeaseCounter) countLeaseHolders(ctx context.Context) (int, error) { - client, err := l.KubeClientFactory.GetClient() - if err != nil { - return 0, err - } - - return kubeutil.GetControlPlaneNodeCount(ctx, client) -} - -// Notify the subscribers about the current controller count -func (l *K0sControllersLeaseCounter) notifySubscribers(count int) { - log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) - log.Debugf("notifying subscribers (%d) about controller count: %d", len(l.subscribers), count) - for _, ch := range l.subscribers { - // Use non-blocking send to avoid blocking the loop - select { - case ch <- count: - case <-time.After(5 * time.Second): - log.Warn("timeout when sending count to subsrciber") - } - } -} -func (l *K0sControllersLeaseCounter) Subscribe() <-chan int { - ch := make(chan int, 1) - l.subscribers = append(l.subscribers, ch) - return ch + l.UpdateControllerCount(count) + }, 10*time.Second) } diff --git a/pkg/component/controller/konnectivity.go b/pkg/component/controller/konnectivity.go index eb5f98640952..5c498cfe85e6 100644 --- a/pkg/component/controller/konnectivity.go +++ b/pkg/component/controller/konnectivity.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "strconv" + "time" "github.com/sirupsen/logrus" @@ -40,17 +41,16 @@ import ( // Konnectivity implements the component interface for konnectivity server type Konnectivity struct { - K0sVars *config.CfgVars - LogLevel string - K0sControllersLeaseCounter *K0sControllersLeaseCounter - - supervisor *supervisor.Supervisor - uid int - serverCount int - serverCountChan <-chan int - stopFunc context.CancelFunc - clusterConfig *v1beta1.ClusterConfig - log *logrus.Entry + K0sVars *config.CfgVars + LogLevel string + ServerCount func() (uint, <-chan struct{}) + + supervisor *supervisor.Supervisor + uid int + + stopFunc context.CancelFunc + clusterConfig *v1beta1.ClusterConfig + log *logrus.Entry *prober.EventEmitter } @@ -92,21 +92,50 @@ func (k *Konnectivity) Init(ctx context.Context) error { // Run .. func (k *Konnectivity) Start(ctx context.Context) error { - // Buffered chan to send updates for the count of servers - k.serverCountChan = k.K0sControllersLeaseCounter.Subscribe() + serverCount, serverCountChanged := k.ServerCount() - // To make the server start, add "dummy" 0 into the channel - if err := k.runServer(0); err != nil { - k.EmitWithPayload("failed to run konnectivity server", err) - return fmt.Errorf("failed to run konnectivity server: %w", err) + if err := k.runServer(serverCount); err != nil { + k.EmitWithPayload("failed to start konnectivity server", err) + return fmt.Errorf("failed to start konnectivity server: %w", err) } - go k.watchControllerCountChanges(ctx) + go func() { + var retry <-chan time.Time + for { + select { + case <-serverCountChanged: + prevServerCount := serverCount + serverCount, serverCountChanged = k.ServerCount() + // restart only if the server count actually changed + if serverCount == prevServerCount { + continue + } + + case <-retry: + k.Emit("retrying to start konnectivity server") + k.log.Info("Retrying to start konnectivity server") + + case <-ctx.Done(): + k.Emit("stopped konnectivity server") + k.log.Info("stopping konnectivity server reconfig loop") + return + } + + retry = nil + + if err := k.runServer(serverCount); err != nil { + k.EmitWithPayload("failed to start konnectivity server", err) + k.log.WithError(err).Errorf("Failed to start konnectivity server") + retry = time.After(10 * time.Second) + continue + } + } + }() return nil } -func (k *Konnectivity) serverArgs(count int) []string { +func (k *Konnectivity) serverArgs(count uint) []string { return stringmap.StringMap{ "--uds-name": filepath.Join(k.K0sVars.KonnectivitySocketDir, "konnectivity-server.sock"), "--cluster-cert": filepath.Join(k.K0sVars.CertRootDir, "server.crt"), @@ -124,42 +153,14 @@ func (k *Konnectivity) serverArgs(count int) []string { "--v": k.LogLevel, "--enable-profiling": "false", "--delete-existing-uds-file": "true", - "--server-count": strconv.Itoa(count), + "--server-count": strconv.FormatUint(uint64(count), 10), "--server-id": k.K0sVars.InvocationID, "--proxy-strategies": "destHost,default", "--cipher-suites": constant.AllowedTLS12CipherSuiteNames(), }.ToArgs() } -// runs the supervisor and restarts if the calculated server count changes -func (k *Konnectivity) watchControllerCountChanges(ctx context.Context) { - // previousArgs := stringmap.StringMap{} - for { - k.log.Debug("waiting for server count change") - select { - case <-ctx.Done(): - k.Emit("stopped konnectivity server") - logrus.Info("stopping konnectivity server reconfig loop") - return - case count := <-k.serverCountChan: - if k.clusterConfig == nil { - k.Emit("skipping konnectivity server start, cluster config not yet available") - continue - } - // restart only if the count actually changes and we've got the global config - if count != k.serverCount { - if err := k.runServer(count); err != nil { - k.EmitWithPayload("failed to run konnectivity server", err) - logrus.Errorf("failed to run konnectivity server: %s", err) - continue - } - } - k.serverCount = count - } - } -} - -func (k *Konnectivity) runServer(count int) error { +func (k *Konnectivity) runServer(count uint) error { // Stop supervisor if k.supervisor != nil { k.EmitWithPayload("restarting konnectivity server due to server count change", @@ -179,12 +180,9 @@ func (k *Konnectivity) runServer(count int) error { } err := k.supervisor.Supervise() if err != nil { - k.EmitWithPayload("failed to run konnectivity server", err) - k.log.Errorf("failed to start konnectivity supervisor: %s", err) k.supervisor = nil // not to make the next loop to try to stop it first return err } - k.serverCount = count k.EmitWithPayload("started konnectivity server", map[string]interface{}{"serverCount": count}) return nil diff --git a/pkg/component/controller/konnectivityagent.go b/pkg/component/controller/konnectivityagent.go index 86071d2bfef0..72540c70132e 100644 --- a/pkg/component/controller/konnectivityagent.go +++ b/pkg/component/controller/konnectivityagent.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path/filepath" + "time" "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/internal/pkg/templatewriter" @@ -32,11 +33,10 @@ import ( ) type KonnectivityAgent struct { - K0sVars *config.CfgVars - APIServerHost string - K0sControllersLeaseCounter *K0sControllersLeaseCounter + K0sVars *config.CfgVars + APIServerHost string + ServerCount func() (uint, <-chan struct{}) - serverCountChan <-chan int configChangeChan chan *v1beta1.ClusterConfig log *logrus.Entry previousConfig konnectivityAgentConfig @@ -55,27 +55,34 @@ func (k *KonnectivityAgent) Init(_ context.Context) error { } func (k *KonnectivityAgent) Start(ctx context.Context) error { - // Subscribe to ctrl count changes - k.serverCountChan = k.K0sControllersLeaseCounter.Subscribe() go func() { - var ( - clusterConfig *v1beta1.ClusterConfig - serverCount int - ) + serverCount, serverCountChanged := k.ServerCount() + var clusterConfig *v1beta1.ClusterConfig + var retry <-chan time.Time for { select { case config := <-k.configChangeChan: clusterConfig = config - case count := <-k.serverCountChan: - serverCount = count + case <-serverCountChanged: + prevServerCount := serverCount + serverCount, serverCountChanged = k.ServerCount() + // write only if the server count actually changed + if serverCount == prevServerCount { + continue + } + + case <-retry: + k.log.Info("Retrying to write konnectivity agent manifest") case <-ctx.Done(): return } + retry = nil + if clusterConfig == nil { k.log.Info("Cluster configuration has not yet been reconciled") continue @@ -83,6 +90,8 @@ func (k *KonnectivityAgent) Start(ctx context.Context) error { if err := k.writeKonnectivityAgent(clusterConfig, serverCount); err != nil { k.log.Errorf("failed to write konnectivity agent manifest: %v", err) + retry = time.After(10 * time.Second) + continue } } }() @@ -101,7 +110,7 @@ func (k *KonnectivityAgent) Stop() error { return nil } -func (k *KonnectivityAgent) writeKonnectivityAgent(clusterConfig *v1beta1.ClusterConfig, serverCount int) error { +func (k *KonnectivityAgent) writeKonnectivityAgent(clusterConfig *v1beta1.ClusterConfig, serverCount uint) error { konnectivityDir := filepath.Join(k.K0sVars.ManifestsDir, "konnectivity") err := dir.Init(konnectivityDir, constant.ManifestsDirMode) if err != nil { @@ -183,7 +192,7 @@ type konnectivityAgentConfig struct { ProxyServerPort uint16 AgentPort uint16 Image string - ServerCount int + ServerCount uint PullPolicy string HostNetwork bool BindToNodeIP bool diff --git a/pkg/kubernetes/lease.go b/pkg/kubernetes/lease.go index 3b1e3dab00a9..07ac8e48f9e5 100644 --- a/pkg/kubernetes/lease.go +++ b/pkg/kubernetes/lease.go @@ -35,19 +35,17 @@ func IsValidLease(lease coordinationv1.Lease) bool { return leaseExpiry.After(time.Now()) } -func GetControlPlaneNodeCount(ctx context.Context, kubeClient kubernetes.Interface) (int, error) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - count := 0 +func CountActiveControllerLeases(ctx context.Context, kubeClient kubernetes.Interface) (count uint, _ error) { leases, err := kubeClient.CoordinationV1().Leases("kube-node-lease").List(ctx, v1.ListOptions{}) if err != nil { return 0, err } for _, l := range leases.Items { - if strings.HasPrefix(l.ObjectMeta.Name, "k0s-ctrl") { - if IsValidLease(l) { - count++ - } + switch { + case !strings.HasPrefix(l.ObjectMeta.Name, "k0s-ctrl-"): + case l.Spec.HolderIdentity == nil || *l.Spec.HolderIdentity == "": + case IsValidLease(l): + count++ } } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 93ecc3980a58..bcbb8eab4916 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -33,7 +33,7 @@ type telemetryData struct { StorageType string ClusterID string WorkerNodesCount int - ControlPlaneNodesCount int + ControlPlaneNodesCount uint WorkerData []workerData CPUTotal int64 MEMTotal int64 @@ -52,7 +52,7 @@ func (td telemetryData) asProperties() analytics.Properties { "storageType": td.StorageType, "clusterID": td.ClusterID, "workerNodesCount": td.WorkerNodesCount, - "controlPlaneNodesCount": td.ControlPlaneNodesCount, + "controlPlaneNodesCount": int(td.ControlPlaneNodesCount), "workerData": td.WorkerData, "memTotal": td.MEMTotal, "cpuTotal": td.CPUTotal, @@ -78,7 +78,7 @@ func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error) data.WorkerData = wds data.MEMTotal = sums.memTotal data.CPUTotal = sums.cpuTotal - data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, c.kubernetesClient) + data.ControlPlaneNodesCount, err = kubeutil.CountActiveControllerLeases(ctx, c.kubernetesClient) if err != nil { return data, fmt.Errorf("can't collect control plane nodes count: %w", err) }