diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index df63d6f2c198..516f1f6f37c2 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -216,10 +216,32 @@ func (c *command) start(ctx context.Context) error { logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type) nodeComponents.Add(ctx, storageBackend) + controllerLeaseCounter := &controller.K0sControllersLeaseCounter{ + ClusterConfig: nodeConfig, + KubeClientFactory: adminClientFactory, + } + + if !c.SingleNode { + nodeComponents.Add(ctx, controllerLeaseCounter) + } + enableKonnectivity := !c.SingleNode && !slices.Contains(c.DisableComponents, constant.KonnectivityServerComponentName) disableEndpointReconciler := !slices.Contains(c.DisableComponents, constant.APIEndpointReconcilerComponentName) && nodeConfig.Spec.API.ExternalAddress != "" + if enableKonnectivity { + nodeComponents.Add(ctx, &controller.Konnectivity{ + SingleNode: c.SingleNode, + LogLevel: c.Logging[constant.KonnectivityServerComponentName], + K0sVars: c.K0sVars, + KubeClientFactory: adminClientFactory, + NodeConfig: nodeConfig, + EventEmitter: prober.NewEventEmitter(), + K0sControllersLeaseCounter: controllerLeaseCounter, + }) + + } + nodeComponents.Add(ctx, &controller.APIServer{ ClusterConfig: nodeConfig, K0sVars: c.K0sVars, @@ -229,13 +251,6 @@ func (c *command) start(ctx context.Context) error { DisableEndpointReconciler: disableEndpointReconciler, }) - if !c.SingleNode { - nodeComponents.Add(ctx, &controller.K0sControllersLeaseCounter{ - ClusterConfig: nodeConfig, - KubeClientFactory: adminClientFactory, - }) - } - var leaderElector interface { leaderelector.Interface manager.Component @@ -467,13 +482,14 @@ func (c *command) start(ctx context.Context) error { } if enableKonnectivity { - clusterComponents.Add(ctx, &controller.Konnectivity{ - SingleNode: c.SingleNode, - LogLevel: c.Logging[constant.KonnectivityServerComponentName], - K0sVars: c.K0sVars, - KubeClientFactory: adminClientFactory, - NodeConfig: nodeConfig, - EventEmitter: prober.NewEventEmitter(), + clusterComponents.Add(ctx, &controller.KonnectivityAgent{ + SingleNode: c.SingleNode, + LogLevel: c.Logging[constant.KonnectivityServerComponentName], + K0sVars: c.K0sVars, + KubeClientFactory: adminClientFactory, + NodeConfig: nodeConfig, + EventEmitter: prober.NewEventEmitter(), + K0sControllersLeaseCounter: controllerLeaseCounter, }) } diff --git a/inttest/upgrade/upgrade_test.go b/inttest/upgrade/upgrade_test.go index f4c8732f6087..1476cab9f786 100644 --- a/inttest/upgrade/upgrade_test.go +++ b/inttest/upgrade/upgrade_test.go @@ -166,6 +166,9 @@ func (s *UpgradeSuite) TestK0sGetsUp() { err = s.WaitForNodeReady(s.WorkerNode(1), kc) s.NoError(err) + + s.Require().NoError(common.WaitForPodLogs(s.Context(), kc, "kube-system")) + } func TestUpgradeSuite(t *testing.T) { diff --git a/pkg/component/controller/controllersleasecounter.go b/pkg/component/controller/controllersleasecounter.go index 98448a671b51..b0718b825572 100644 --- a/pkg/component/controller/controllersleasecounter.go +++ b/pkg/component/controller/controllersleasecounter.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "time" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/component/manager" @@ -37,12 +38,16 @@ type K0sControllersLeaseCounter struct { cancelFunc context.CancelFunc leaseCancel context.CancelFunc + + subscribers []chan int } var _ manager.Component = (*K0sControllersLeaseCounter)(nil) // Init initializes the component needs func (l *K0sControllersLeaseCounter) Init(_ context.Context) error { + l.subscribers = make([]chan int, 0) + return nil } @@ -88,6 +93,9 @@ func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error { } } }() + + go l.runLeaseCounter(ctx) + return nil } @@ -102,3 +110,54 @@ func (l *K0sControllersLeaseCounter) Stop() error { } return nil } + +// Check the numbers of controller every 10 secs and notify the subscribers +func (l *K0sControllersLeaseCounter) runLeaseCounter(ctx context.Context) { + log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) + log.Debug("starting controller lease counter every 10 secs") + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Info("stopping controller lease counter") + return + case <-ticker.C: + log.Debug("counting controller lease holders") + count, err := l.countLeaseHolders(ctx) + if err != nil { + log.Errorf("failed to count controller leases: %s", err) + } + l.notifySubscribers(count) + } + } +} + +func (l *K0sControllersLeaseCounter) countLeaseHolders(ctx context.Context) (int, error) { + client, err := l.KubeClientFactory.GetClient() + if err != nil { + return 0, err + } + + return kubeutil.GetControlPlaneNodeCount(ctx, client) +} + +// Notify the subscribers about the current controller count +func (l *K0sControllersLeaseCounter) notifySubscribers(count int) { + log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) + log.Debugf("notifying subscribers (%d) about controller count: %d", len(l.subscribers), count) + for _, ch := range l.subscribers { + // Use non-blocking send to avoid blocking the loop + select { + case ch <- count: + case <-time.After(5 * time.Second): + log.Warn("timeout when sending count to subsrciber") + } + } +} + +func (l *K0sControllersLeaseCounter) Subscribe() <-chan int { + ch := make(chan int, 1) + l.subscribers = append(l.subscribers, ch) + return ch +} diff --git a/pkg/component/controller/konnectivity.go b/pkg/component/controller/konnectivity.go index 734d9ae24e86..052c26038f84 100644 --- a/pkg/component/controller/konnectivity.go +++ b/pkg/component/controller/konnectivity.go @@ -22,15 +22,12 @@ import ( "os" "path/filepath" "strconv" - "sync" - "time" "github.com/sirupsen/logrus" "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/internal/pkg/stringmap" "github.com/k0sproject/k0s/internal/pkg/sysinfo/machineid" - "github.com/k0sproject/k0s/internal/pkg/templatewriter" "github.com/k0sproject/k0s/internal/pkg/users" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/assets" @@ -38,37 +35,36 @@ import ( "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/config" "github.com/k0sproject/k0s/pkg/constant" + "github.com/k0sproject/k0s/pkg/k0scontext" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" "github.com/k0sproject/k0s/pkg/supervisor" ) -// Konnectivity implements the component interface of konnectivity server +// Konnectivity implements the component interface for konnectivity server type Konnectivity struct { K0sVars *config.CfgVars LogLevel string SingleNode bool // used for lease lock - KubeClientFactory kubeutil.ClientFactoryInterface - NodeConfig *v1beta1.ClusterConfig + KubeClientFactory kubeutil.ClientFactoryInterface + NodeConfig *v1beta1.ClusterConfig + K0sControllersLeaseCounter *K0sControllersLeaseCounter + + supervisor *supervisor.Supervisor + uid int + serverCount int + serverCountChan <-chan int + stopFunc context.CancelFunc + clusterConfig *v1beta1.ClusterConfig + log *logrus.Entry - supervisor *supervisor.Supervisor - uid int - serverCount int - serverCountChan chan int - stopFunc context.CancelFunc - clusterConfig *v1beta1.ClusterConfig - log *logrus.Entry - leaseCounterRunning bool - previousConfig konnectivityAgentConfig - agentManifestLock sync.Mutex *prober.EventEmitter } var _ manager.Component = (*Konnectivity)(nil) -var _ manager.Reconciler = (*Konnectivity)(nil) // Init ... -func (k *Konnectivity) Init(_ context.Context) error { +func (k *Konnectivity) Init(ctx context.Context) error { var err error k.uid, err = users.GetUID(constant.KonnectivityServerUser) if err != nil { @@ -94,33 +90,28 @@ func (k *Konnectivity) Init(_ context.Context) error { } defer k.Emit("succesfully initialized konnectivity component") + + k.clusterConfig = k0scontext.GetNodeConfig(ctx) + return nil } // Run .. func (k *Konnectivity) Start(ctx context.Context) error { // Buffered chan to send updates for the count of servers - k.serverCountChan = make(chan int, 1) + k.serverCountChan = k.K0sControllersLeaseCounter.Subscribe() - ctx, k.stopFunc = context.WithCancel(ctx) + // To make the server start, add "dummy" 0 into the channel + if err := k.runServer(ctx, 0); err != nil { + k.EmitWithPayload("failed to run konnectivity server", err) + return fmt.Errorf("failed to run konnectivity server: %s", err) + } - go k.runServer(ctx) + go k.watchControllerCountChanges(ctx) return nil } -// Reconcile detects changes in configuration and applies them to the component -func (k *Konnectivity) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterConfig) error { - k.clusterConfig = clusterCfg - if !k.SingleNode { - go k.runLeaseCounter(ctx) - } else { - // It's a buffered channel so once we start the runServer routine it'll pick this up and just sees it never changing - k.serverCountChan <- 1 - } - return k.writeKonnectivityAgent() -} - func (k *Konnectivity) defaultArgs() stringmap.StringMap { machineID, err := machineid.Generate() if err != nil { @@ -150,9 +141,10 @@ func (k *Konnectivity) defaultArgs() stringmap.StringMap { } // runs the supervisor and restarts if the calculated server count changes -func (k *Konnectivity) runServer(ctx context.Context) { - previousArgs := stringmap.StringMap{} +func (k *Konnectivity) watchControllerCountChanges(ctx context.Context) { + // previousArgs := stringmap.StringMap{} for { + k.log.Debug("waiting for server count change") select { case <-ctx.Done(): k.Emit("stopped konnectivity server") @@ -164,47 +156,50 @@ func (k *Konnectivity) runServer(ctx context.Context) { continue } // restart only if the count actually changes and we've got the global config - if count != k.serverCount && k.clusterConfig != nil { - args := k.defaultArgs() - args["--server-count"] = strconv.Itoa(count) - if args.Equals(previousArgs) { - logrus.Info("no changes detected for konnectivity-server") - } - // Stop supervisor - if k.supervisor != nil { - k.EmitWithPayload("restarting konnectivity server due to server count change", - map[string]interface{}{"serverCount": count}) - if err := k.supervisor.Stop(); err != nil { - logrus.Errorf("failed to stop supervisor: %s", err) - // TODO Should we just return? That means other part will continue to run but the server is never properly restarted - } - } - - k.supervisor = &supervisor.Supervisor{ - Name: "konnectivity", - BinPath: assets.BinPath("konnectivity-server", k.K0sVars.BinDir), - DataDir: k.K0sVars.DataDir, - RunDir: k.K0sVars.RunDir, - Args: args.ToArgs(), - UID: k.uid, - } - err := k.supervisor.Supervise() - if err != nil { + if count != k.serverCount { + if err := k.runServer(ctx, count); err != nil { k.EmitWithPayload("failed to run konnectivity server", err) - logrus.Errorf("failed to start konnectivity supervisor: %s", err) - k.supervisor = nil // not to make the next loop to try to stop it first + logrus.Errorf("failed to run konnectivity server: %s", err) continue } - k.serverCount = count - - if err := k.writeKonnectivityAgent(); err != nil { - k.EmitWithPayload("failed to write konnectivity agent config", err) - logrus.Errorf("failed to update konnectivity-agent template: %s", err) - } - k.EmitWithPayload("started konnectivity server", map[string]interface{}{"serverCount": count}) } + k.serverCount = count + } + } +} + +func (k *Konnectivity) runServer(ctx context.Context, count int) error { + args := k.defaultArgs() + args["--server-count"] = strconv.Itoa(count) + + // Stop supervisor + if k.supervisor != nil { + k.EmitWithPayload("restarting konnectivity server due to server count change", + map[string]interface{}{"serverCount": count}) + if err := k.supervisor.Stop(); err != nil { + k.log.Errorf("failed to stop supervisor: %s", err) } } + + k.supervisor = &supervisor.Supervisor{ + Name: "konnectivity", + BinPath: assets.BinPath("konnectivity-server", k.K0sVars.BinDir), + DataDir: k.K0sVars.DataDir, + RunDir: k.K0sVars.RunDir, + Args: args.ToArgs(), + UID: k.uid, + } + err := k.supervisor.Supervise() + if err != nil { + k.EmitWithPayload("failed to run konnectivity server", err) + k.log.Errorf("failed to start konnectivity supervisor: %s", err) + k.supervisor = nil // not to make the next loop to try to stop it first + return err + } + k.serverCount = count + k.EmitWithPayload("started konnectivity server", map[string]interface{}{"serverCount": count}) + + return nil } // Stop stops @@ -227,232 +222,3 @@ func (k *Konnectivity) Healthy() error { return nil } - -type konnectivityAgentConfig struct { - ProxyServerHost string - ProxyServerPort uint16 - AgentPort uint16 - Image string - ServerCount int - PullPolicy string - HostNetwork bool - BindToNodeIP bool - APIServerPortMapping string - FeatureGates string -} - -func (k *Konnectivity) writeKonnectivityAgent() error { - k.agentManifestLock.Lock() - defer k.agentManifestLock.Unlock() - konnectivityDir := filepath.Join(k.K0sVars.ManifestsDir, "konnectivity") - err := dir.Init(konnectivityDir, constant.ManifestsDirMode) - if err != nil { - return err - } - cfg := konnectivityAgentConfig{ - // Since the konnectivity server runs with hostNetwork=true this is the - // IP address of the master machine - ProxyServerHost: k.NodeConfig.Spec.API.APIAddress(), // TODO: should it be an APIAddress? - ProxyServerPort: uint16(k.clusterConfig.Spec.Konnectivity.AgentPort), - Image: k.clusterConfig.Spec.Images.Konnectivity.URI(), - ServerCount: k.serverCount, - PullPolicy: k.clusterConfig.Spec.Images.DefaultPullPolicy, - } - - if k.clusterConfig.Spec.Network != nil { - nllb := k.clusterConfig.Spec.Network.NodeLocalLoadBalancing - if nllb.IsEnabled() { - switch nllb.Type { - case v1beta1.NllbTypeEnvoyProxy: - k.log.Debugf("Enabling node-local load balancing via %s", nllb.Type) - - // FIXME: Transitions from non-node-local load balanced to - // node-local load balanced setups will be problematic: The - // controller will update the DaemonSet with localhost, but the - // worker nodes won't reconcile their state (yet) and need to be - // restarted manually in order to start their load balancer. - // Transitions in the other direction suffer from the same - // limitation, but that will be less grave, as the node-local - // load balancers will remain operational until the next node - // restart and the agents will stay connected. - - // The node-local load balancer will run in the host network, so - // the agent needs to do the same in order to use it. - cfg.HostNetwork = true - - // FIXME: This is not exactly on par with the way it's - // implemented on the worker side, i.e. there's no fallback if - // localhost doesn't resolve to a loopback address. But this - // would require some shenanigans to pull in node-specific - // values here. A possible solution would be to convert the - // konnectivity agent to a static Pod as well. - cfg.ProxyServerHost = "localhost" - - if nllb.EnvoyProxy.KonnectivityServerBindPort != nil { - cfg.ProxyServerPort = uint16(*nllb.EnvoyProxy.KonnectivityServerBindPort) - } else { - cfg.ProxyServerPort = uint16(*v1beta1.DefaultEnvoyProxy().KonnectivityServerBindPort) - } - default: - return fmt.Errorf("unsupported node-local load balancer type: %q", k.clusterConfig.Spec.Network.NodeLocalLoadBalancing.Type) - } - } - } - - if cfg == k.previousConfig { - k.log.Debug("agent configs match, no need to reconcile") - return nil - } - - tw := templatewriter.TemplateWriter{ - Name: "konnectivity-agent", - Template: konnectivityAgentTemplate, - Data: cfg, - Path: filepath.Join(konnectivityDir, "konnectivity-agent.yaml"), - } - err = tw.Write() - if err != nil { - k.EmitWithPayload("failed to write konnectivity agent manifest", err) - return fmt.Errorf("failed to write konnectivity agent manifest: %v", err) - } - k.previousConfig = cfg - k.Emit("wrote konnectivity agent new manifest") - return nil -} - -func (k *Konnectivity) runLeaseCounter(ctx context.Context) { - if k.leaseCounterRunning { - return - } - k.leaseCounterRunning = true - logrus.Infof("starting to count controller lease holders every 10 secs") - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - logrus.Info("stopping konnectivity lease counter") - return - case <-ticker.C: - count, err := k.countLeaseHolders(ctx) - if err != nil { - logrus.Errorf("failed to count controller leases: %s", err) - continue - } - k.serverCountChan <- count - } - } -} - -func (k *Konnectivity) countLeaseHolders(ctx context.Context) (int, error) { - client, err := k.KubeClientFactory.GetClient() - if err != nil { - return 0, err - } - - return kubeutil.GetControlPlaneNodeCount(ctx, client) -} - -const konnectivityAgentTemplate = ` -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: system:konnectivity-server - labels: - kubernetes.io/cluster-service: "true" -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: system:auth-delegator -subjects: - - apiGroup: rbac.authorization.k8s.io - kind: User - name: system:konnectivity-server ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: konnectivity-agent - namespace: kube-system - labels: - kubernetes.io/cluster-service: "true" ---- -apiVersion: apps/v1 -# Alternatively, you can deploy the agents as Deployments. It is not necessary -# to have an agent on each node. -kind: DaemonSet -metadata: - labels: - k8s-app: konnectivity-agent - namespace: kube-system - name: konnectivity-agent -spec: - selector: - matchLabels: - k8s-app: konnectivity-agent - template: - metadata: - labels: - k8s-app: konnectivity-agent - annotations: - prometheus.io/scrape: 'true' - prometheus.io/port: '8093' - spec: - nodeSelector: - kubernetes.io/os: linux - priorityClassName: system-cluster-critical - tolerations: - - operator: Exists - {{- if .HostNetwork }} - hostNetwork: true - {{- end }} - containers: - - image: {{ .Image }} - imagePullPolicy: {{ .PullPolicy }} - name: konnectivity-agent - command: ["/proxy-agent"] - env: - # the variable is not in a use - # we need it to have agent restarted on server count change - - name: K0S_CONTROLLER_COUNT - value: "{{ .ServerCount }}" - - - name: NODE_IP - valueFrom: - fieldRef: - fieldPath: status.hostIP - args: - - --logtostderr=true - - --ca-cert=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt - - --proxy-server-host={{ .ProxyServerHost }} - - --proxy-server-port={{ .ProxyServerPort }} - - --service-account-token-path=/var/run/secrets/tokens/konnectivity-agent-token - - --agent-identifiers=host=$(NODE_IP) - - --agent-id=$(NODE_IP) - {{- if .BindToNodeIP }} - - --bind-address=$(NODE_IP) - {{- end }} - {{- if .APIServerPortMapping }} - - --apiserver-port-mapping={{ .APIServerPortMapping }} - {{- end }} - {{- if .FeatureGates }} - - "--feature-gates={{ .FeatureGates }}" - {{- end }} - volumeMounts: - - mountPath: /var/run/secrets/tokens - name: konnectivity-agent-token - livenessProbe: - httpGet: - port: 8093 - path: /healthz - initialDelaySeconds: 15 - timeoutSeconds: 15 - serviceAccountName: konnectivity-agent - volumes: - - name: konnectivity-agent-token - projected: - sources: - - serviceAccountToken: - path: konnectivity-agent-token - audience: system:konnectivity-server -` diff --git a/pkg/component/controller/konnectivityagent.go b/pkg/component/controller/konnectivityagent.go new file mode 100644 index 000000000000..640f6fc58135 --- /dev/null +++ b/pkg/component/controller/konnectivityagent.go @@ -0,0 +1,301 @@ +/* +Copyright 2023 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "path/filepath" + "sync" + + "github.com/k0sproject/k0s/internal/pkg/dir" + "github.com/k0sproject/k0s/internal/pkg/templatewriter" + "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" + "github.com/k0sproject/k0s/pkg/component/manager" + "github.com/k0sproject/k0s/pkg/component/prober" + "github.com/k0sproject/k0s/pkg/config" + "github.com/k0sproject/k0s/pkg/constant" + kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/sirupsen/logrus" +) + +type KonnectivityAgent struct { + K0sVars *config.CfgVars + LogLevel string + SingleNode bool + // used for lease lock + KubeClientFactory kubeutil.ClientFactoryInterface + NodeConfig *v1beta1.ClusterConfig + K0sControllersLeaseCounter *K0sControllersLeaseCounter + + serverCount int + serverCountChan <-chan int + configChangeChan chan *v1beta1.ClusterConfig + clusterConfig *v1beta1.ClusterConfig + log *logrus.Entry + previousConfig konnectivityAgentConfig + agentManifestLock sync.Mutex + *prober.EventEmitter +} + +var _ manager.Component = (*KonnectivityAgent)(nil) +var _ manager.Reconciler = (*KonnectivityAgent)(nil) + +func (k *KonnectivityAgent) Init(_ context.Context) error { + k.log = logrus.WithFields(logrus.Fields{"component": "konnectivity-agent"}) + + k.configChangeChan = make(chan *v1beta1.ClusterConfig, 1) + + return nil +} + +func (k *KonnectivityAgent) Start(ctx context.Context) error { + // Subscribe to ctrl count changes + k.serverCountChan = k.K0sControllersLeaseCounter.Subscribe() + + go func() { + for { + select { + case <-ctx.Done(): + return + case count := <-k.serverCountChan: + k.serverCount = count + if err := k.writeKonnectivityAgent(); err != nil { + k.log.Errorf("failed to write konnectivity agent manifest: %v", err) + } + case clusterConfig := <-k.configChangeChan: + k.clusterConfig = clusterConfig + if err := k.writeKonnectivityAgent(); err != nil { + k.log.Errorf("failed to write konnectivity agent manifest: %v", err) + } + } + } + }() + return nil +} + +// Reconcile detects changes in configuration and applies them to the component +func (k *KonnectivityAgent) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterConfig) error { + k.configChangeChan <- clusterCfg + + return nil +} + +func (k *KonnectivityAgent) Stop() error { + return nil +} + +func (k *KonnectivityAgent) writeKonnectivityAgent() error { + k.agentManifestLock.Lock() + defer k.agentManifestLock.Unlock() + + if k.clusterConfig == nil { + return fmt.Errorf("cluster config is not reconciled yet") + } + + konnectivityDir := filepath.Join(k.K0sVars.ManifestsDir, "konnectivity") + err := dir.Init(konnectivityDir, constant.ManifestsDirMode) + if err != nil { + return err + } + cfg := konnectivityAgentConfig{ + // Since the konnectivity server runs with hostNetwork=true this is the + // IP address of the master machine + ProxyServerHost: k.NodeConfig.Spec.API.APIAddress(), // TODO: should it be an APIAddress? + ProxyServerPort: uint16(k.clusterConfig.Spec.Konnectivity.AgentPort), + Image: k.clusterConfig.Spec.Images.Konnectivity.URI(), + ServerCount: k.serverCount, + PullPolicy: k.clusterConfig.Spec.Images.DefaultPullPolicy, + } + + if k.clusterConfig.Spec.Network != nil { + nllb := k.clusterConfig.Spec.Network.NodeLocalLoadBalancing + if nllb.IsEnabled() { + switch nllb.Type { + case v1beta1.NllbTypeEnvoyProxy: + k.log.Debugf("Enabling node-local load balancing via %s", nllb.Type) + + // FIXME: Transitions from non-node-local load balanced to + // node-local load balanced setups will be problematic: The + // controller will update the DaemonSet with localhost, but the + // worker nodes won't reconcile their state (yet) and need to be + // restarted manually in order to start their load balancer. + // Transitions in the other direction suffer from the same + // limitation, but that will be less grave, as the node-local + // load balancers will remain operational until the next node + // restart and the agents will stay connected. + + // The node-local load balancer will run in the host network, so + // the agent needs to do the same in order to use it. + cfg.HostNetwork = true + + // FIXME: This is not exactly on par with the way it's + // implemented on the worker side, i.e. there's no fallback if + // localhost doesn't resolve to a loopback address. But this + // would require some shenanigans to pull in node-specific + // values here. A possible solution would be to convert the + // konnectivity agent to a static Pod as well. + cfg.ProxyServerHost = "localhost" + + if nllb.EnvoyProxy.KonnectivityServerBindPort != nil { + cfg.ProxyServerPort = uint16(*nllb.EnvoyProxy.KonnectivityServerBindPort) + } else { + cfg.ProxyServerPort = uint16(*v1beta1.DefaultEnvoyProxy().KonnectivityServerBindPort) + } + default: + return fmt.Errorf("unsupported node-local load balancer type: %q", k.clusterConfig.Spec.Network.NodeLocalLoadBalancing.Type) + } + } + } + + if cfg == k.previousConfig { + k.log.Debug("agent configs match, no need to reconcile") + return nil + } + + tw := templatewriter.TemplateWriter{ + Name: "konnectivity-agent", + Template: konnectivityAgentTemplate, + Data: cfg, + Path: filepath.Join(konnectivityDir, "konnectivity-agent.yaml"), + } + err = tw.Write() + if err != nil { + k.EmitWithPayload("failed to write konnectivity agent manifest", err) + return fmt.Errorf("failed to write konnectivity agent manifest: %v", err) + } + k.previousConfig = cfg + k.Emit("wrote konnectivity agent new manifest") + return nil +} + +type konnectivityAgentConfig struct { + ProxyServerHost string + ProxyServerPort uint16 + AgentPort uint16 + Image string + ServerCount int + PullPolicy string + HostNetwork bool + BindToNodeIP bool + APIServerPortMapping string + FeatureGates string +} + +const konnectivityAgentTemplate = ` +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:konnectivity-server + labels: + kubernetes.io/cluster-service: "true" +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:auth-delegator +subjects: + - apiGroup: rbac.authorization.k8s.io + kind: User + name: system:konnectivity-server +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: konnectivity-agent + namespace: kube-system + labels: + kubernetes.io/cluster-service: "true" +--- +apiVersion: apps/v1 +# Alternatively, you can deploy the agents as Deployments. It is not necessary +# to have an agent on each node. +kind: DaemonSet +metadata: + labels: + k8s-app: konnectivity-agent + namespace: kube-system + name: konnectivity-agent +spec: + selector: + matchLabels: + k8s-app: konnectivity-agent + template: + metadata: + labels: + k8s-app: konnectivity-agent + annotations: + prometheus.io/scrape: 'true' + prometheus.io/port: '8093' + spec: + nodeSelector: + kubernetes.io/os: linux + priorityClassName: system-cluster-critical + tolerations: + - operator: Exists + {{- if .HostNetwork }} + hostNetwork: true + {{- end }} + containers: + - image: {{ .Image }} + imagePullPolicy: {{ .PullPolicy }} + name: konnectivity-agent + command: ["/proxy-agent"] + env: + # the variable is not in a use + # we need it to have agent restarted on server count change + - name: K0S_CONTROLLER_COUNT + value: "{{ .ServerCount }}" + + - name: NODE_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + args: + - --logtostderr=true + - --ca-cert=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt + - --proxy-server-host={{ .ProxyServerHost }} + - --proxy-server-port={{ .ProxyServerPort }} + - --service-account-token-path=/var/run/secrets/tokens/konnectivity-agent-token + - --agent-identifiers=host=$(NODE_IP) + - --agent-id=$(NODE_IP) + {{- if .BindToNodeIP }} + - --bind-address=$(NODE_IP) + {{- end }} + {{- if .APIServerPortMapping }} + - --apiserver-port-mapping={{ .APIServerPortMapping }} + {{- end }} + {{- if .FeatureGates }} + - "--feature-gates={{ .FeatureGates }}" + {{- end }} + volumeMounts: + - mountPath: /var/run/secrets/tokens + name: konnectivity-agent-token + livenessProbe: + httpGet: + port: 8093 + path: /healthz + initialDelaySeconds: 15 + timeoutSeconds: 15 + serviceAccountName: konnectivity-agent + volumes: + - name: konnectivity-agent-token + projected: + sources: + - serviceAccountToken: + path: konnectivity-agent-token + audience: system:konnectivity-server +`