From df9d2f6f2b1b16a6305dcbb003ccd87d433582f5 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Tue, 24 Sep 2024 17:52:05 +0200 Subject: [PATCH] Wait for autpoilot CRDs before starting Autopilot worker component Instead of having multiple brute-force retry loops in a row, just have one that actually watches the CRDs and waits until it has observed all the Autopilot CRDs to be established. This covers API server reachability and CRD availability in one go. It also ensures that controller-runtime registration is done only once. Since controller-runtime v0.19.0, registering the same thing twice will result in an error. Signed-off-by: Tom Wieczorek --- pkg/autopilot/controller/root_worker.go | 68 ++++------- pkg/component/worker/autopilot.go | 150 +++++++++++++++++++----- 2 files changed, 143 insertions(+), 75 deletions(-) diff --git a/pkg/autopilot/controller/root_worker.go b/pkg/autopilot/controller/root_worker.go index 88417920c7cb..8fb9fe250d83 100644 --- a/pkg/autopilot/controller/root_worker.go +++ b/pkg/autopilot/controller/root_worker.go @@ -17,7 +17,6 @@ package controller import ( "context" "fmt" - "time" apcli "github.com/k0sproject/k0s/pkg/autopilot/client" apdel "github.com/k0sproject/k0s/pkg/autopilot/controller/delegate" @@ -25,14 +24,12 @@ import ( apsig "github.com/k0sproject/k0s/pkg/autopilot/controller/signal" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - k8sretry "k8s.io/client-go/util/retry" + cr "sigs.k8s.io/controller-runtime" crman "sigs.k8s.io/controller-runtime/pkg/manager" crmetricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" - "github.com/avast/retry-go" "github.com/sirupsen/logrus" ) @@ -69,49 +66,30 @@ func (w *rootWorker) Run(ctx context.Context) error { HealthProbeBindAddress: w.cfg.HealthProbeBindAddr, } - var mgr crman.Manager - if err := retry.Do( - func() (err error) { - mgr, err = cr.NewManager(w.clientFactory.RESTConfig(), managerOpts) - return err - }, - retry.Context(ctx), - retry.LastErrorOnly(true), - retry.Delay(1*time.Second), - retry.OnRetry(func(attempt uint, err error) { - logger.WithError(err).Debugf("Failed to start controller manager in attempt #%d, retrying after backoff", attempt+1) - }), - ); err != nil { - logger.WithError(err).Fatal("unable to start controller manager") + clusterID, err := w.getClusterID(ctx) + if err != nil { + return err + } + + mgr, err := cr.NewManager(w.clientFactory.RESTConfig(), managerOpts) + if err != nil { + return fmt.Errorf("unable to start controller manager: %w", err) + } + + if err := RegisterIndexers(ctx, mgr, "worker"); err != nil { + return fmt.Errorf("unable to register indexers: %w", err) + } + + if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil { + return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err) + } + + // The controller-runtime start blocks until the context is cancelled. + if err := mgr.Start(ctx); err != nil { + return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err) } - // In some cases, we need to wait on the worker side until controller deploys all autopilot CRDs - return k8sretry.OnError(wait.Backoff{ - Steps: 120, - Duration: 1 * time.Second, - Factor: 1.0, - Jitter: 0.1, - }, func(err error) bool { - return true - }, func() error { - clusterID, err := w.getClusterID(ctx) - if err != nil { - return err - } - - if err := RegisterIndexers(ctx, mgr, "worker"); err != nil { - return fmt.Errorf("unable to register indexers: %w", err) - } - - if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil { - return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err) - } - // The controller-runtime start blocks until the context is cancelled. - if err := mgr.Start(ctx); err != nil { - return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err) - } - return nil - }) + return nil } func (w *rootWorker) getClusterID(ctx context.Context) (string, error) { diff --git a/pkg/component/worker/autopilot.go b/pkg/component/worker/autopilot.go index d6bcc772c86e..b2c61809b336 100644 --- a/pkg/component/worker/autopilot.go +++ b/pkg/component/worker/autopilot.go @@ -18,19 +18,24 @@ package worker import ( "context" - "errors" "fmt" + "slices" "time" + autopilotv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2" apcli "github.com/k0sproject/k0s/pkg/autopilot/client" apcont "github.com/k0sproject/k0s/pkg/autopilot/controller" aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root" + k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/config" - "github.com/sirupsen/logrus" + "github.com/k0sproject/k0s/pkg/kubernetes/watch" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/rest" + + "github.com/sirupsen/logrus" ) const ( @@ -52,35 +57,24 @@ func (a *Autopilot) Init(ctx context.Context) error { func (a *Autopilot) Start(ctx context.Context) error { log := logrus.WithFields(logrus.Fields{"component": "autopilot"}) - // Wait 5 mins till we see kubelet auth config in place - timeout, cancel := context.WithTimeout(ctx, defaultPollTimeout) - defer cancel() - - var restConfig *rest.Config - // wait.PollUntilWithContext passes it is own ctx argument as a ctx to the given function - // Poll until the kubelet config can be loaded successfully, as this is the access to the kube api - // needed by autopilot. - if err := wait.PollUntilWithContext(timeout, defaultPollDuration, func(ctx context.Context) (done bool, err error) { - log.Debugf("Attempting to load autopilot client config") - if restConfig, err = a.CertManager.GetRestConfig(ctx); err != nil { - log.WithError(err).Warnf("Failed to load autopilot client config, retrying in %v", defaultPollDuration) - return false, nil + var ( + clientFactory apcli.FactoryInterface + waitErr error + ) + if pollErr := wait.PollUntilContextTimeout(ctx, defaultPollDuration, defaultPollTimeout, true, func(ctx context.Context) (bool, error) { + clientFactory, waitErr = a.newClientFactory(ctx) + if waitErr == nil { + return true, nil } - return true, nil - }); err != nil { - return fmt.Errorf("unable to create autopilot client: %w", err) - } - - // Without the config, there is nothing that we can do. - - if restConfig == nil { - return errors.New("unable to create an autopilot client -- timed out") - } + log.WithError(waitErr).Debug("Failed to create autopilot client factory, retrying in ", defaultPollDuration) + return false, nil + }); pollErr != nil { + if waitErr == nil { + return pollErr + } - autopilotClientFactory, err := apcli.NewClientFactory(restConfig) - if err != nil { - return fmt.Errorf("creating autopilot client factory error: %w", err) + return fmt.Errorf("%w: %w", pollErr, waitErr) } log.Info("Autopilot client factory created, booting up worker root controller") @@ -91,7 +85,7 @@ func (a *Autopilot) Start(ctx context.Context) error { ManagerPort: 8899, MetricsBindAddr: "0", HealthProbeBindAddr: "0", - }, log, autopilotClientFactory) + }, log, clientFactory) if err != nil { return fmt.Errorf("failed to create autopilot worker: %w", err) } @@ -111,3 +105,99 @@ func (a *Autopilot) Start(ctx context.Context) error { func (a *Autopilot) Stop() error { return nil } + +func (a *Autopilot) newClientFactory(ctx context.Context) (apcli.FactoryInterface, error) { + restConfig, err := a.CertManager.GetRestConfig(ctx) + if err != nil { + return nil, err + } + + clientFactory, err := apcli.NewClientFactory(restConfig) + if err != nil { + return nil, err + } + + // We need to wait until all autopilot CRDs are established. + // Gather all kinds in the autopilot API group. + var kinds []string + gv := autopilotv1beta2.SchemeGroupVersion + for kind := range k0sscheme.Scheme.KnownTypes(gv) { + // For some reason, the scheme also returns types from core/v1. Filter + // those out by only adding kinds which are _only_ in the autopilot + // group, and not in some other group as well. The only way to get all + // the GVKs for a certain type is by creating a new instance of that + // type and then asking the scheme about it. + obj, err := k0sscheme.Scheme.New(gv.WithKind(kind)) + if err != nil { + return nil, err + } + gvks, _, err := k0sscheme.Scheme.ObjectKinds(obj) + if err != nil { + return nil, err + } + + // Skip the kind if there's at least one GVK which is not in the + // autopilot group + if !slices.ContainsFunc(gvks, func(gvk schema.GroupVersionKind) bool { + return gvk.Group != autopilotv1beta2.GroupName + }) { + kinds = append(kinds, kind) + } + } + + client, err := clientFactory.GetExtensionClient() + if err != nil { + return nil, err + } + + // Watch all the CRDs until all the required ones are established. + log := logrus.WithField("component", "autopilot") + slices.Sort(kinds) // for cosmetic purposes + if err = watch.CRDs(client.CustomResourceDefinitions()). + WithErrorCallback(func(err error) (time.Duration, error) { + if retryAfter, e := watch.IsRetryable(err); e == nil { + log.WithError(err).Info( + "Transient error while watching for CRDs", + ", starting over after ", retryAfter, " ...", + ) + return retryAfter, nil + } + + retryAfter := 10 * time.Second + log.WithError(err).Error( + "Error while watching CRDs", + ", starting over after ", retryAfter, " ...", + ) + return retryAfter, nil + }). + Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) { + if item.Spec.Group != autopilotv1beta2.GroupName { + return false, nil // Not an autopilot CRD. + } + + // Find the established status for the CRD. + var established apiextensionsv1.ConditionStatus + for _, cond := range item.Status.Conditions { + if cond.Type == apiextensionsv1.Established { + established = cond.Status + break + } + } + + if established != apiextensionsv1.ConditionTrue { + return false, nil // CRD not yet established. + } + + // Remove the CRD's (list) kind from the list. + kinds = slices.DeleteFunc(kinds, func(kind string) bool { + return kind == item.Spec.Names.Kind || kind == item.Spec.Names.ListKind + }) + + // If the list is empty, all required CRDs are established. + return len(kinds) < 1, nil + }); err != nil { + return nil, fmt.Errorf("while waiting for Autopilot CRDs %v to become established: %w", kinds, err) + } + + return clientFactory, nil +}