Skip to content

Commit

Permalink
Wait for autpoilot CRDs before starting Autopilot worker component
Browse files Browse the repository at this point in the history
Instead of having two brute-force retry loops in a row, just have one
that actually watches the CRDs and waits until it has observed all the
Autopilot CRDs to be established. This covers API server reachability
and CRD availability in one go.

It also ensures that controller-runtime registration is done only once.
Since controller-runtime v0.19.0, registering the same thing twice will
result in an error.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Sep 24, 2024
1 parent 3663982 commit 02b8488
Showing 1 changed file with 115 additions and 35 deletions.
150 changes: 115 additions & 35 deletions pkg/autopilot/controller/root_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,28 @@ package controller

import (
"context"
"errors"
"fmt"
"slices"
"time"

autopilotv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2"
apcli "github.com/k0sproject/k0s/pkg/autopilot/client"
apdel "github.com/k0sproject/k0s/pkg/autopilot/controller/delegate"
aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root"
apsig "github.com/k0sproject/k0s/pkg/autopilot/controller/signal"
k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
k8sretry "k8s.io/client-go/util/retry"

cr "sigs.k8s.io/controller-runtime"
crman "sigs.k8s.io/controller-runtime/pkg/manager"
crmetricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/avast/retry-go"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -68,42 +74,116 @@ func (w *rootWorker) Run(ctx context.Context) error {
HealthProbeBindAddress: w.cfg.HealthProbeBindAddr,
}

var mgr crman.Manager
if err := retry.Do(
func() (err error) {
mgr, err = cr.NewManager(w.clientFactory.RESTConfig(), managerOpts)
return err
},
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.Delay(1*time.Second),
retry.OnRetry(func(attempt uint, err error) {
logger.WithError(err).Debugf("Failed to start controller manager in attempt #%d, retrying after backoff", attempt+1)
}),
); err != nil {
logger.WithError(err).Fatal("unable to start controller manager")
// We need to wait until all autopilot CRDs are established.
var waitErr error
if pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
waitErr = w.waitForCRDs(ctx)
return waitErr == nil, nil
}); pollErr != nil {
return errors.Join(waitErr, pollErr)
}

// In some cases, we need to wait on the worker side until controller deploys all autopilot CRDs
return k8sretry.OnError(wait.Backoff{
Steps: 120,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1,
}, func(err error) bool {
return true
}, func() error {
if err := RegisterIndexers(ctx, mgr, "worker"); err != nil {
return fmt.Errorf("unable to register indexers: %w", err)
}
mgr, err := cr.NewManager(w.clientFactory.RESTConfig(), managerOpts)
if err != nil {
fmt.Errorf("unable to start controller manager: %w", err)
}

if err := RegisterIndexers(ctx, mgr, "worker"); err != nil {
return fmt.Errorf("unable to register indexers: %w", err)
}

if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil {
return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err)
if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil {

Check failure on line 95 in pkg/autopilot/controller/root_worker.go

View workflow job for this annotation

GitHub Actions / Unit tests :: windows-amd64

undefined: clusterID
return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err)
}

// The controller-runtime start blocks until the context is cancelled.
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err)
}

return nil
}

func (w *rootWorker) waitForCRDs(ctx context.Context) error {
// Gather all kinds in the autopilot API group
var kinds []string
gv := autopilotv1beta2.SchemeGroupVersion
for kind := range k0sscheme.Scheme.KnownTypes(gv) {
// For some reason, the scheme also returns types from core/v1. Filter
// those out by only adding kinds which are _only_ in the autopilot
// group, and not in some other group as well. The only way to get all
// the GVKs for a certain type is by creating a new instance of that
// type and then asking the scheme about it.
obj, err := k0sscheme.Scheme.New(gv.WithKind(kind))
if err != nil {
return err
}
// The controller-runtime start blocks until the context is cancelled.
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err)
gvks, _, err := k0sscheme.Scheme.ObjectKinds(obj)
if err != nil {
return err
}

// Skip the kind if there's at least one GVK which is not in the
// autopilot group
if !slices.ContainsFunc(gvks, func(gvk schema.GroupVersionKind) bool {
return gvk.Group != autopilotv1beta2.GroupName
}) {
kinds = append(kinds, kind)
}
return nil
})
}

client, err := w.clientFactory.GetExtensionClient()
if err != nil {
return err
}

// Watch all the CRDs until all the required ones are established.
slices.Sort(kinds) // for cosmetic purposes
if err = watch.CRDs(client.CustomResourceDefinitions()).
WithErrorCallback(func(err error) (time.Duration, error) {
if retryAfter, e := watch.IsRetryable(err); e == nil {
w.log.WithError(err).Info(
"Transient error while watching for CRDs",
", starting over after ", retryAfter, " ...",
)
return retryAfter, nil
}

retryAfter := 10 * time.Second
w.log.WithError(err).Error(
"Error while watching CRDs",
", starting over after ", retryAfter, " ...",
)
return retryAfter, nil
}).
Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) {
if item.Spec.Group != autopilotv1beta2.GroupName {
return false, nil // Not an autopilot CRD.
}

// Find the established status for the CRD.
var established apiextensionsv1.ConditionStatus
for _, cond := range item.Status.Conditions {
if cond.Type == apiextensionsv1.Established {
established = cond.Status
break
}
}

if established != apiextensionsv1.ConditionTrue {
return false, nil // CRD not yet established.
}

// Remove the CRD's (list) kind from the list.
kinds = slices.DeleteFunc(kinds, func(kind string) bool {
return kind == item.Spec.Names.Kind || kind == item.Spec.Names.ListKind
})

// If the list is empty, all required CRDs are established.
return len(kinds) < 1, nil
}); err != nil {
return fmt.Errorf("while waiting for Autopilot CRDs %v to become established: %w", kinds, err)
}

return nil
}

0 comments on commit 02b8488

Please sign in to comment.