diff --git a/pkg/autopilot/controller/root_worker.go b/pkg/autopilot/controller/root_worker.go index 88417920c7cb..8fb9fe250d83 100644 --- a/pkg/autopilot/controller/root_worker.go +++ b/pkg/autopilot/controller/root_worker.go @@ -17,7 +17,6 @@ package controller import ( "context" "fmt" - "time" apcli "github.com/k0sproject/k0s/pkg/autopilot/client" apdel "github.com/k0sproject/k0s/pkg/autopilot/controller/delegate" @@ -25,14 +24,12 @@ import ( apsig "github.com/k0sproject/k0s/pkg/autopilot/controller/signal" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - k8sretry "k8s.io/client-go/util/retry" + cr "sigs.k8s.io/controller-runtime" crman "sigs.k8s.io/controller-runtime/pkg/manager" crmetricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" - "github.com/avast/retry-go" "github.com/sirupsen/logrus" ) @@ -69,49 +66,30 @@ func (w *rootWorker) Run(ctx context.Context) error { HealthProbeBindAddress: w.cfg.HealthProbeBindAddr, } - var mgr crman.Manager - if err := retry.Do( - func() (err error) { - mgr, err = cr.NewManager(w.clientFactory.RESTConfig(), managerOpts) - return err - }, - retry.Context(ctx), - retry.LastErrorOnly(true), - retry.Delay(1*time.Second), - retry.OnRetry(func(attempt uint, err error) { - logger.WithError(err).Debugf("Failed to start controller manager in attempt #%d, retrying after backoff", attempt+1) - }), - ); err != nil { - logger.WithError(err).Fatal("unable to start controller manager") + clusterID, err := w.getClusterID(ctx) + if err != nil { + return err + } + + mgr, err := cr.NewManager(w.clientFactory.RESTConfig(), managerOpts) + if err != nil { + return fmt.Errorf("unable to start controller manager: %w", err) + } + + if err := RegisterIndexers(ctx, mgr, "worker"); err != nil { + return fmt.Errorf("unable to register indexers: %w", err) + } + + if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil { + return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err) + } + + // The controller-runtime start blocks until the context is cancelled. + if err := mgr.Start(ctx); err != nil { + return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err) } - // In some cases, we need to wait on the worker side until controller deploys all autopilot CRDs - return k8sretry.OnError(wait.Backoff{ - Steps: 120, - Duration: 1 * time.Second, - Factor: 1.0, - Jitter: 0.1, - }, func(err error) bool { - return true - }, func() error { - clusterID, err := w.getClusterID(ctx) - if err != nil { - return err - } - - if err := RegisterIndexers(ctx, mgr, "worker"); err != nil { - return fmt.Errorf("unable to register indexers: %w", err) - } - - if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil { - return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err) - } - // The controller-runtime start blocks until the context is cancelled. - if err := mgr.Start(ctx); err != nil { - return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err) - } - return nil - }) + return nil } func (w *rootWorker) getClusterID(ctx context.Context) (string, error) { diff --git a/pkg/component/worker/autopilot.go b/pkg/component/worker/autopilot.go index d6bcc772c86e..b2c61809b336 100644 --- a/pkg/component/worker/autopilot.go +++ b/pkg/component/worker/autopilot.go @@ -18,19 +18,24 @@ package worker import ( "context" - "errors" "fmt" + "slices" "time" + autopilotv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2" apcli "github.com/k0sproject/k0s/pkg/autopilot/client" apcont "github.com/k0sproject/k0s/pkg/autopilot/controller" aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root" + k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/config" - "github.com/sirupsen/logrus" + "github.com/k0sproject/k0s/pkg/kubernetes/watch" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/rest" + + "github.com/sirupsen/logrus" ) const ( @@ -52,35 +57,24 @@ func (a *Autopilot) Init(ctx context.Context) error { func (a *Autopilot) Start(ctx context.Context) error { log := logrus.WithFields(logrus.Fields{"component": "autopilot"}) - // Wait 5 mins till we see kubelet auth config in place - timeout, cancel := context.WithTimeout(ctx, defaultPollTimeout) - defer cancel() - - var restConfig *rest.Config - // wait.PollUntilWithContext passes it is own ctx argument as a ctx to the given function - // Poll until the kubelet config can be loaded successfully, as this is the access to the kube api - // needed by autopilot. - if err := wait.PollUntilWithContext(timeout, defaultPollDuration, func(ctx context.Context) (done bool, err error) { - log.Debugf("Attempting to load autopilot client config") - if restConfig, err = a.CertManager.GetRestConfig(ctx); err != nil { - log.WithError(err).Warnf("Failed to load autopilot client config, retrying in %v", defaultPollDuration) - return false, nil + var ( + clientFactory apcli.FactoryInterface + waitErr error + ) + if pollErr := wait.PollUntilContextTimeout(ctx, defaultPollDuration, defaultPollTimeout, true, func(ctx context.Context) (bool, error) { + clientFactory, waitErr = a.newClientFactory(ctx) + if waitErr == nil { + return true, nil } - return true, nil - }); err != nil { - return fmt.Errorf("unable to create autopilot client: %w", err) - } - - // Without the config, there is nothing that we can do. - - if restConfig == nil { - return errors.New("unable to create an autopilot client -- timed out") - } + log.WithError(waitErr).Debug("Failed to create autopilot client factory, retrying in ", defaultPollDuration) + return false, nil + }); pollErr != nil { + if waitErr == nil { + return pollErr + } - autopilotClientFactory, err := apcli.NewClientFactory(restConfig) - if err != nil { - return fmt.Errorf("creating autopilot client factory error: %w", err) + return fmt.Errorf("%w: %w", pollErr, waitErr) } log.Info("Autopilot client factory created, booting up worker root controller") @@ -91,7 +85,7 @@ func (a *Autopilot) Start(ctx context.Context) error { ManagerPort: 8899, MetricsBindAddr: "0", HealthProbeBindAddr: "0", - }, log, autopilotClientFactory) + }, log, clientFactory) if err != nil { return fmt.Errorf("failed to create autopilot worker: %w", err) } @@ -111,3 +105,99 @@ func (a *Autopilot) Start(ctx context.Context) error { func (a *Autopilot) Stop() error { return nil } + +func (a *Autopilot) newClientFactory(ctx context.Context) (apcli.FactoryInterface, error) { + restConfig, err := a.CertManager.GetRestConfig(ctx) + if err != nil { + return nil, err + } + + clientFactory, err := apcli.NewClientFactory(restConfig) + if err != nil { + return nil, err + } + + // We need to wait until all autopilot CRDs are established. + // Gather all kinds in the autopilot API group. + var kinds []string + gv := autopilotv1beta2.SchemeGroupVersion + for kind := range k0sscheme.Scheme.KnownTypes(gv) { + // For some reason, the scheme also returns types from core/v1. Filter + // those out by only adding kinds which are _only_ in the autopilot + // group, and not in some other group as well. The only way to get all + // the GVKs for a certain type is by creating a new instance of that + // type and then asking the scheme about it. + obj, err := k0sscheme.Scheme.New(gv.WithKind(kind)) + if err != nil { + return nil, err + } + gvks, _, err := k0sscheme.Scheme.ObjectKinds(obj) + if err != nil { + return nil, err + } + + // Skip the kind if there's at least one GVK which is not in the + // autopilot group + if !slices.ContainsFunc(gvks, func(gvk schema.GroupVersionKind) bool { + return gvk.Group != autopilotv1beta2.GroupName + }) { + kinds = append(kinds, kind) + } + } + + client, err := clientFactory.GetExtensionClient() + if err != nil { + return nil, err + } + + // Watch all the CRDs until all the required ones are established. + log := logrus.WithField("component", "autopilot") + slices.Sort(kinds) // for cosmetic purposes + if err = watch.CRDs(client.CustomResourceDefinitions()). + WithErrorCallback(func(err error) (time.Duration, error) { + if retryAfter, e := watch.IsRetryable(err); e == nil { + log.WithError(err).Info( + "Transient error while watching for CRDs", + ", starting over after ", retryAfter, " ...", + ) + return retryAfter, nil + } + + retryAfter := 10 * time.Second + log.WithError(err).Error( + "Error while watching CRDs", + ", starting over after ", retryAfter, " ...", + ) + return retryAfter, nil + }). + Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) { + if item.Spec.Group != autopilotv1beta2.GroupName { + return false, nil // Not an autopilot CRD. + } + + // Find the established status for the CRD. + var established apiextensionsv1.ConditionStatus + for _, cond := range item.Status.Conditions { + if cond.Type == apiextensionsv1.Established { + established = cond.Status + break + } + } + + if established != apiextensionsv1.ConditionTrue { + return false, nil // CRD not yet established. + } + + // Remove the CRD's (list) kind from the list. + kinds = slices.DeleteFunc(kinds, func(kind string) bool { + return kind == item.Spec.Names.Kind || kind == item.Spec.Names.ListKind + }) + + // If the list is empty, all required CRDs are established. + return len(kinds) < 1, nil + }); err != nil { + return nil, fmt.Errorf("while waiting for Autopilot CRDs %v to become established: %w", kinds, err) + } + + return clientFactory, nil +}