From f3a2b73201a9bb992a110372c309e55cbed09041 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 00:19:35 +0200 Subject: [PATCH] Add controller lease counter component after API server The controller lease counter requires the API server to be up and running in order to work properly. It cannot acquire or count leases before the API server is up, nor can it release the lease after the API server is shut down. Move the counter component down in the controller start method, so that it's started after the API server. Decouple the actual controller count value from the counter component by putting it directly into the start method. Let the counter component update that value, and let the konnectivity components consume those updates. Introduce internal/sync.Latest as a small abstraction for values that may change over time and will be consumed by multiple observers. Signed-off-by: Tom Wieczorek --- cmd/controller/controller.go | 37 +++--- internal/sync/value/latest.go | 109 ++++++++++++++++++ internal/sync/value/latest_test.go | 53 +++++++++ .../controller/updates/clusterinfo.go | 4 +- .../controller/controllersleasecounter.go | 74 ++++-------- pkg/component/controller/konnectivity.go | 102 ++++++++-------- pkg/component/controller/konnectivityagent.go | 37 +++--- pkg/kubernetes/lease.go | 14 +-- pkg/telemetry/telemetry.go | 6 +- 9 files changed, 287 insertions(+), 149 deletions(-) create mode 100644 internal/sync/value/latest.go create mode 100644 internal/sync/value/latest_test.go diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index e6c44164d494..68b4255f3d46 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -35,6 +35,7 @@ import ( "github.com/k0sproject/k0s/internal/pkg/file" k0slog "github.com/k0sproject/k0s/internal/pkg/log" "github.com/k0sproject/k0s/internal/pkg/sysinfo" + "github.com/k0sproject/k0s/internal/sync/value" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/applier" apclient "github.com/k0sproject/k0s/pkg/autopilot/client" @@ -215,15 +216,8 @@ func (c *command) start(ctx context.Context) error { logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type) nodeComponents.Add(ctx, storageBackend) - controllerLeaseCounter := &controller.K0sControllersLeaseCounter{ - InvocationID: c.K0sVars.InvocationID, - ClusterConfig: nodeConfig, - KubeClientFactory: adminClientFactory, - } - - if !c.SingleNode { - nodeComponents.Add(ctx, controllerLeaseCounter) - } + // Assume a single active controller during startup + numActiveControllers := value.NewLatest[uint](1) if cplb := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplb != nil && cplb.Enabled { if c.SingleNode { @@ -246,10 +240,10 @@ func (c *command) start(ctx context.Context) error { if enableKonnectivity { nodeComponents.Add(ctx, &controller.Konnectivity{ - K0sVars: c.K0sVars, - LogLevel: c.LogLevels.Konnectivity, - EventEmitter: prober.NewEventEmitter(), - K0sControllersLeaseCounter: controllerLeaseCounter, + K0sVars: c.K0sVars, + LogLevel: c.LogLevels.Konnectivity, + EventEmitter: prober.NewEventEmitter(), + ServerCount: numActiveControllers.Peek, }) } @@ -262,6 +256,15 @@ func (c *command) start(ctx context.Context) error { DisableEndpointReconciler: disableEndpointReconciler, }) + if !c.SingleNode { + nodeComponents.Add(ctx, &controller.K0sControllersLeaseCounter{ + InvocationID: c.K0sVars.InvocationID, + ClusterConfig: nodeConfig, + KubeClientFactory: adminClientFactory, + UpdateControllerCount: numActiveControllers.Set, + }) + } + var leaderElector interface { leaderelector.Interface manager.Component @@ -514,10 +517,10 @@ func (c *command) start(ctx context.Context) error { if enableKonnectivity { clusterComponents.Add(ctx, &controller.KonnectivityAgent{ - K0sVars: c.K0sVars, - APIServerHost: nodeConfig.Spec.API.APIAddress(), - EventEmitter: prober.NewEventEmitter(), - K0sControllersLeaseCounter: controllerLeaseCounter, + K0sVars: c.K0sVars, + APIServerHost: nodeConfig.Spec.API.APIAddress(), + EventEmitter: prober.NewEventEmitter(), + ServerCount: numActiveControllers.Peek, }) } diff --git a/internal/sync/value/latest.go b/internal/sync/value/latest.go new file mode 100644 index 000000000000..406f90769dc1 --- /dev/null +++ b/internal/sync/value/latest.go @@ -0,0 +1,109 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package value + +import "sync/atomic" + +// A value that can be atomically updated, where each update invalidates the +// previous value. Whenever the value changes, an associated expiration channel +// is closed. Callers can use this to be notified of updates. The zero value of +// Latest holds a zero value of T. +// +// Latest is useful when some shared state is updated frequently, and readers +// don't need to keep track of every value, just the latest one. Latest makes +// this easy, as there's no need to maintain a separate channel for each reader. +// +// Example Usage: +// +// package main +// +// import ( +// "fmt" +// "sync" +// "time" +// +// "github.com/k0sproject/k0s/internal/sync/value" +// ) +// +// func main() { +// // Declare a zero latest value +// var l value.Latest[int] +// +// fmt.Println("Zero value:", l.Get()) // Output: Zero value: 0 +// +// // Set the value +// l.Set(42) +// fmt.Println("Value set to 42") +// +// // Peek at the current value and get the expiration channel +// value, expired := l.Peek() +// fmt.Println("Peeked value:", value) // Output: Peeked value: 42 +// +// // Use a goroutine to watch for expiration +// var wg sync.WaitGroup +// wg.Add(1) +// go func() { +// defer wg.Done() +// <-expired +// fmt.Println("Value expired, new value:", l.Get()) // Output: Value expired, new value: 84 +// }() +// +// // Set a new value, which will expire the previous value +// time.Sleep(1 * time.Second) // Simulate some delay +// l.Set(84) +// fmt.Println("New value set to 84") +// +// wg.Wait() // Wait for the watcher goroutine to finish +// } +type Latest[T any] struct { + p atomic.Pointer[val[T]] +} + +func NewLatest[T any](value T) *Latest[T] { + latest := new(Latest[T]) + latest.Set(value) + return latest +} + +// Retrieves the latest value and its associated expiration channel. If no value +// was previously set, it returns the zero value of T and an expiration channel +// that is closed as soon as a value is set. +func (l *Latest[T]) Peek() (T, <-chan struct{}) { + if loaded := l.p.Load(); loaded != nil { + return loaded.inner, loaded.ch + } + + value := val[T]{ch: make(chan struct{})} + if !l.p.CompareAndSwap(nil, &value) { + loaded := l.p.Load() + return loaded.inner, loaded.ch + } + + return value.inner, value.ch +} + +// Sets a new value and closes the expiration channel of the previous value. +func (l *Latest[T]) Set(value T) { + if old := l.p.Swap(&val[T]{value, make(chan struct{})}); old != nil { + close(old.ch) + } +} + +type val[T any] struct { + inner T + ch chan struct{} +} diff --git a/internal/sync/value/latest_test.go b/internal/sync/value/latest_test.go new file mode 100644 index 000000000000..5f98d3517a27 --- /dev/null +++ b/internal/sync/value/latest_test.go @@ -0,0 +1,53 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package value_test + +import ( + "sync" + "testing" + "time" + + "github.com/k0sproject/k0s/internal/sync/value" + + "github.com/stretchr/testify/assert" +) + +func TestLatest(t *testing.T) { + var underTest value.Latest[int] + value, expired := underTest.Peek() + assert.Zero(t, value, "Zero latest should return zero value") + assert.NotNil(t, expired) + + var got int + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-expired + got, _ = underTest.Peek() + }() + + time.Sleep(10 * time.Millisecond) // Simulate some delay + underTest.Set(42) + wg.Wait() + + assert.Equal(t, 42, got) + + newValue, newExpired := underTest.Peek() + assert.Equal(t, 42, newValue) + assert.NotEqual(t, expired, newExpired) +} diff --git a/pkg/autopilot/controller/updates/clusterinfo.go b/pkg/autopilot/controller/updates/clusterinfo.go index 968a65cb2ba2..590f7957a731 100644 --- a/pkg/autopilot/controller/updates/clusterinfo.go +++ b/pkg/autopilot/controller/updates/clusterinfo.go @@ -36,7 +36,7 @@ type ClusterInfo struct { K0sVersion string StorageType string ClusterID string - ControlPlaneNodesCount int + ControlPlaneNodesCount uint WorkerData WorkerData CNIProvider string Arch string @@ -121,7 +121,7 @@ func CollectData(ctx context.Context, kc kubernetes.Interface) (*ClusterInfo, er } // Collect control plane node count - ci.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, kc) + ci.ControlPlaneNodesCount, err = kubeutil.CountActiveControllerLeases(ctx, kc) if err != nil { return ci, fmt.Errorf("can't collect control plane nodes count: %w", err) } diff --git a/pkg/component/controller/controllersleasecounter.go b/pkg/component/controller/controllersleasecounter.go index 4bcbed2ce6f4..41f24c6365ad 100644 --- a/pkg/component/controller/controllersleasecounter.go +++ b/pkg/component/controller/controllersleasecounter.go @@ -27,27 +27,27 @@ import ( "github.com/k0sproject/k0s/pkg/leaderelection" "github.com/k0sproject/k0s/pkg/node" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "github.com/sirupsen/logrus" ) // K0sControllersLeaseCounter implements a component that manages a lease per controller. // The per-controller leases are used to determine the amount of currently running controllers type K0sControllersLeaseCounter struct { - InvocationID string - ClusterConfig *v1beta1.ClusterConfig - KubeClientFactory kubeutil.ClientFactoryInterface + InvocationID string + ClusterConfig *v1beta1.ClusterConfig + KubeClientFactory kubeutil.ClientFactoryInterface + UpdateControllerCount func(uint) cancelFunc context.CancelFunc - - subscribers []chan int } var _ manager.Component = (*K0sControllersLeaseCounter)(nil) // Init initializes the component needs func (l *K0sControllersLeaseCounter) Init(_ context.Context) error { - l.subscribers = make([]chan int, 0) - return nil } @@ -94,7 +94,7 @@ func (l *K0sControllersLeaseCounter) Start(context.Context) error { } }() - go l.runLeaseCounter(ctx) + go l.runLeaseCounter(ctx, client) return nil } @@ -107,53 +107,21 @@ func (l *K0sControllersLeaseCounter) Stop() error { return nil } -// Check the numbers of controller every 10 secs and notify the subscribers -func (l *K0sControllersLeaseCounter) runLeaseCounter(ctx context.Context) { +// Updates the number of active controller leases every 10 secs. +func (l *K0sControllersLeaseCounter) runLeaseCounter(ctx context.Context, clients kubernetes.Interface) { log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) - log.Debug("starting controller lease counter every 10 secs") - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - log.Info("stopping controller lease counter") + log.Debug("Starting controller lease counter every 10 secs") + + wait.UntilWithContext(ctx, func(ctx context.Context) { + log.Debug("Counting active controller leases") + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + count, err := kubeutil.CountActiveControllerLeases(ctx, clients) + if err != nil { + log.WithError(err).Error("Failed to count controller lease holders") return - case <-ticker.C: - log.Debug("counting controller lease holders") - count, err := l.countLeaseHolders(ctx) - if err != nil { - log.Errorf("failed to count controller leases: %s", err) - } - l.notifySubscribers(count) } - } -} - -func (l *K0sControllersLeaseCounter) countLeaseHolders(ctx context.Context) (int, error) { - client, err := l.KubeClientFactory.GetClient() - if err != nil { - return 0, err - } - - return kubeutil.GetControlPlaneNodeCount(ctx, client) -} - -// Notify the subscribers about the current controller count -func (l *K0sControllersLeaseCounter) notifySubscribers(count int) { - log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) - log.Debugf("notifying subscribers (%d) about controller count: %d", len(l.subscribers), count) - for _, ch := range l.subscribers { - // Use non-blocking send to avoid blocking the loop - select { - case ch <- count: - case <-time.After(5 * time.Second): - log.Warn("timeout when sending count to subsrciber") - } - } -} -func (l *K0sControllersLeaseCounter) Subscribe() <-chan int { - ch := make(chan int, 1) - l.subscribers = append(l.subscribers, ch) - return ch + l.UpdateControllerCount(count) + }, 10*time.Second) } diff --git a/pkg/component/controller/konnectivity.go b/pkg/component/controller/konnectivity.go index eb5f98640952..5c498cfe85e6 100644 --- a/pkg/component/controller/konnectivity.go +++ b/pkg/component/controller/konnectivity.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "strconv" + "time" "github.com/sirupsen/logrus" @@ -40,17 +41,16 @@ import ( // Konnectivity implements the component interface for konnectivity server type Konnectivity struct { - K0sVars *config.CfgVars - LogLevel string - K0sControllersLeaseCounter *K0sControllersLeaseCounter - - supervisor *supervisor.Supervisor - uid int - serverCount int - serverCountChan <-chan int - stopFunc context.CancelFunc - clusterConfig *v1beta1.ClusterConfig - log *logrus.Entry + K0sVars *config.CfgVars + LogLevel string + ServerCount func() (uint, <-chan struct{}) + + supervisor *supervisor.Supervisor + uid int + + stopFunc context.CancelFunc + clusterConfig *v1beta1.ClusterConfig + log *logrus.Entry *prober.EventEmitter } @@ -92,21 +92,50 @@ func (k *Konnectivity) Init(ctx context.Context) error { // Run .. func (k *Konnectivity) Start(ctx context.Context) error { - // Buffered chan to send updates for the count of servers - k.serverCountChan = k.K0sControllersLeaseCounter.Subscribe() + serverCount, serverCountChanged := k.ServerCount() - // To make the server start, add "dummy" 0 into the channel - if err := k.runServer(0); err != nil { - k.EmitWithPayload("failed to run konnectivity server", err) - return fmt.Errorf("failed to run konnectivity server: %w", err) + if err := k.runServer(serverCount); err != nil { + k.EmitWithPayload("failed to start konnectivity server", err) + return fmt.Errorf("failed to start konnectivity server: %w", err) } - go k.watchControllerCountChanges(ctx) + go func() { + var retry <-chan time.Time + for { + select { + case <-serverCountChanged: + prevServerCount := serverCount + serverCount, serverCountChanged = k.ServerCount() + // restart only if the server count actually changed + if serverCount == prevServerCount { + continue + } + + case <-retry: + k.Emit("retrying to start konnectivity server") + k.log.Info("Retrying to start konnectivity server") + + case <-ctx.Done(): + k.Emit("stopped konnectivity server") + k.log.Info("stopping konnectivity server reconfig loop") + return + } + + retry = nil + + if err := k.runServer(serverCount); err != nil { + k.EmitWithPayload("failed to start konnectivity server", err) + k.log.WithError(err).Errorf("Failed to start konnectivity server") + retry = time.After(10 * time.Second) + continue + } + } + }() return nil } -func (k *Konnectivity) serverArgs(count int) []string { +func (k *Konnectivity) serverArgs(count uint) []string { return stringmap.StringMap{ "--uds-name": filepath.Join(k.K0sVars.KonnectivitySocketDir, "konnectivity-server.sock"), "--cluster-cert": filepath.Join(k.K0sVars.CertRootDir, "server.crt"), @@ -124,42 +153,14 @@ func (k *Konnectivity) serverArgs(count int) []string { "--v": k.LogLevel, "--enable-profiling": "false", "--delete-existing-uds-file": "true", - "--server-count": strconv.Itoa(count), + "--server-count": strconv.FormatUint(uint64(count), 10), "--server-id": k.K0sVars.InvocationID, "--proxy-strategies": "destHost,default", "--cipher-suites": constant.AllowedTLS12CipherSuiteNames(), }.ToArgs() } -// runs the supervisor and restarts if the calculated server count changes -func (k *Konnectivity) watchControllerCountChanges(ctx context.Context) { - // previousArgs := stringmap.StringMap{} - for { - k.log.Debug("waiting for server count change") - select { - case <-ctx.Done(): - k.Emit("stopped konnectivity server") - logrus.Info("stopping konnectivity server reconfig loop") - return - case count := <-k.serverCountChan: - if k.clusterConfig == nil { - k.Emit("skipping konnectivity server start, cluster config not yet available") - continue - } - // restart only if the count actually changes and we've got the global config - if count != k.serverCount { - if err := k.runServer(count); err != nil { - k.EmitWithPayload("failed to run konnectivity server", err) - logrus.Errorf("failed to run konnectivity server: %s", err) - continue - } - } - k.serverCount = count - } - } -} - -func (k *Konnectivity) runServer(count int) error { +func (k *Konnectivity) runServer(count uint) error { // Stop supervisor if k.supervisor != nil { k.EmitWithPayload("restarting konnectivity server due to server count change", @@ -179,12 +180,9 @@ func (k *Konnectivity) runServer(count int) error { } err := k.supervisor.Supervise() if err != nil { - k.EmitWithPayload("failed to run konnectivity server", err) - k.log.Errorf("failed to start konnectivity supervisor: %s", err) k.supervisor = nil // not to make the next loop to try to stop it first return err } - k.serverCount = count k.EmitWithPayload("started konnectivity server", map[string]interface{}{"serverCount": count}) return nil diff --git a/pkg/component/controller/konnectivityagent.go b/pkg/component/controller/konnectivityagent.go index 86071d2bfef0..72540c70132e 100644 --- a/pkg/component/controller/konnectivityagent.go +++ b/pkg/component/controller/konnectivityagent.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path/filepath" + "time" "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/internal/pkg/templatewriter" @@ -32,11 +33,10 @@ import ( ) type KonnectivityAgent struct { - K0sVars *config.CfgVars - APIServerHost string - K0sControllersLeaseCounter *K0sControllersLeaseCounter + K0sVars *config.CfgVars + APIServerHost string + ServerCount func() (uint, <-chan struct{}) - serverCountChan <-chan int configChangeChan chan *v1beta1.ClusterConfig log *logrus.Entry previousConfig konnectivityAgentConfig @@ -55,27 +55,34 @@ func (k *KonnectivityAgent) Init(_ context.Context) error { } func (k *KonnectivityAgent) Start(ctx context.Context) error { - // Subscribe to ctrl count changes - k.serverCountChan = k.K0sControllersLeaseCounter.Subscribe() go func() { - var ( - clusterConfig *v1beta1.ClusterConfig - serverCount int - ) + serverCount, serverCountChanged := k.ServerCount() + var clusterConfig *v1beta1.ClusterConfig + var retry <-chan time.Time for { select { case config := <-k.configChangeChan: clusterConfig = config - case count := <-k.serverCountChan: - serverCount = count + case <-serverCountChanged: + prevServerCount := serverCount + serverCount, serverCountChanged = k.ServerCount() + // write only if the server count actually changed + if serverCount == prevServerCount { + continue + } + + case <-retry: + k.log.Info("Retrying to write konnectivity agent manifest") case <-ctx.Done(): return } + retry = nil + if clusterConfig == nil { k.log.Info("Cluster configuration has not yet been reconciled") continue @@ -83,6 +90,8 @@ func (k *KonnectivityAgent) Start(ctx context.Context) error { if err := k.writeKonnectivityAgent(clusterConfig, serverCount); err != nil { k.log.Errorf("failed to write konnectivity agent manifest: %v", err) + retry = time.After(10 * time.Second) + continue } } }() @@ -101,7 +110,7 @@ func (k *KonnectivityAgent) Stop() error { return nil } -func (k *KonnectivityAgent) writeKonnectivityAgent(clusterConfig *v1beta1.ClusterConfig, serverCount int) error { +func (k *KonnectivityAgent) writeKonnectivityAgent(clusterConfig *v1beta1.ClusterConfig, serverCount uint) error { konnectivityDir := filepath.Join(k.K0sVars.ManifestsDir, "konnectivity") err := dir.Init(konnectivityDir, constant.ManifestsDirMode) if err != nil { @@ -183,7 +192,7 @@ type konnectivityAgentConfig struct { ProxyServerPort uint16 AgentPort uint16 Image string - ServerCount int + ServerCount uint PullPolicy string HostNetwork bool BindToNodeIP bool diff --git a/pkg/kubernetes/lease.go b/pkg/kubernetes/lease.go index 3b1e3dab00a9..07ac8e48f9e5 100644 --- a/pkg/kubernetes/lease.go +++ b/pkg/kubernetes/lease.go @@ -35,19 +35,17 @@ func IsValidLease(lease coordinationv1.Lease) bool { return leaseExpiry.After(time.Now()) } -func GetControlPlaneNodeCount(ctx context.Context, kubeClient kubernetes.Interface) (int, error) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - count := 0 +func CountActiveControllerLeases(ctx context.Context, kubeClient kubernetes.Interface) (count uint, _ error) { leases, err := kubeClient.CoordinationV1().Leases("kube-node-lease").List(ctx, v1.ListOptions{}) if err != nil { return 0, err } for _, l := range leases.Items { - if strings.HasPrefix(l.ObjectMeta.Name, "k0s-ctrl") { - if IsValidLease(l) { - count++ - } + switch { + case !strings.HasPrefix(l.ObjectMeta.Name, "k0s-ctrl-"): + case l.Spec.HolderIdentity == nil || *l.Spec.HolderIdentity == "": + case IsValidLease(l): + count++ } } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 93ecc3980a58..bcbb8eab4916 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -33,7 +33,7 @@ type telemetryData struct { StorageType string ClusterID string WorkerNodesCount int - ControlPlaneNodesCount int + ControlPlaneNodesCount uint WorkerData []workerData CPUTotal int64 MEMTotal int64 @@ -52,7 +52,7 @@ func (td telemetryData) asProperties() analytics.Properties { "storageType": td.StorageType, "clusterID": td.ClusterID, "workerNodesCount": td.WorkerNodesCount, - "controlPlaneNodesCount": td.ControlPlaneNodesCount, + "controlPlaneNodesCount": int(td.ControlPlaneNodesCount), "workerData": td.WorkerData, "memTotal": td.MEMTotal, "cpuTotal": td.CPUTotal, @@ -78,7 +78,7 @@ func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error) data.WorkerData = wds data.MEMTotal = sums.memTotal data.CPUTotal = sums.cpuTotal - data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, c.kubernetesClient) + data.ControlPlaneNodesCount, err = kubeutil.CountActiveControllerLeases(ctx, c.kubernetesClient) if err != nil { return data, fmt.Errorf("can't collect control plane nodes count: %w", err) }