Skip to content

Commit

Permalink
Wait for autpoilot CRDs before starting Autopilot worker component
Browse files Browse the repository at this point in the history
Instead of having multiple brute-force retry loops in a row, just have
one that actually watches the CRDs and waits until it has observed
all the Autopilot CRDs to be established. This covers API server
reachability and CRD availability in one go.

It also ensures that controller-runtime registration is done only once.
Since controller-runtime v0.19.0, registering the same thing twice will
result in an error.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Sep 24, 2024
1 parent cc1d8d4 commit df9d2f6
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 75 deletions.
68 changes: 23 additions & 45 deletions pkg/autopilot/controller/root_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@ package controller
import (
"context"
"fmt"
"time"

apcli "github.com/k0sproject/k0s/pkg/autopilot/client"
apdel "github.com/k0sproject/k0s/pkg/autopilot/controller/delegate"
aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root"
apsig "github.com/k0sproject/k0s/pkg/autopilot/controller/signal"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sretry "k8s.io/client-go/util/retry"

cr "sigs.k8s.io/controller-runtime"
crman "sigs.k8s.io/controller-runtime/pkg/manager"
crmetricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/avast/retry-go"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -69,49 +66,30 @@ func (w *rootWorker) Run(ctx context.Context) error {
HealthProbeBindAddress: w.cfg.HealthProbeBindAddr,
}

var mgr crman.Manager
if err := retry.Do(
func() (err error) {
mgr, err = cr.NewManager(w.clientFactory.RESTConfig(), managerOpts)
return err
},
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.Delay(1*time.Second),
retry.OnRetry(func(attempt uint, err error) {
logger.WithError(err).Debugf("Failed to start controller manager in attempt #%d, retrying after backoff", attempt+1)
}),
); err != nil {
logger.WithError(err).Fatal("unable to start controller manager")
clusterID, err := w.getClusterID(ctx)
if err != nil {
return err
}

mgr, err := cr.NewManager(w.clientFactory.RESTConfig(), managerOpts)
if err != nil {
return fmt.Errorf("unable to start controller manager: %w", err)
}

if err := RegisterIndexers(ctx, mgr, "worker"); err != nil {
return fmt.Errorf("unable to register indexers: %w", err)
}

if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil {
return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err)
}

// The controller-runtime start blocks until the context is cancelled.
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err)
}

// In some cases, we need to wait on the worker side until controller deploys all autopilot CRDs
return k8sretry.OnError(wait.Backoff{
Steps: 120,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1,
}, func(err error) bool {
return true
}, func() error {
clusterID, err := w.getClusterID(ctx)
if err != nil {
return err
}

if err := RegisterIndexers(ctx, mgr, "worker"); err != nil {
return fmt.Errorf("unable to register indexers: %w", err)
}

if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil {
return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err)
}
// The controller-runtime start blocks until the context is cancelled.
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err)
}
return nil
})
return nil
}

func (w *rootWorker) getClusterID(ctx context.Context) (string, error) {
Expand Down
150 changes: 120 additions & 30 deletions pkg/component/worker/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ package worker

import (
"context"
"errors"
"fmt"
"slices"
"time"

autopilotv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2"
apcli "github.com/k0sproject/k0s/pkg/autopilot/client"
apcont "github.com/k0sproject/k0s/pkg/autopilot/controller"
aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root"
k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
"github.com/sirupsen/logrus"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"

"github.com/sirupsen/logrus"
)

const (
Expand All @@ -52,35 +57,24 @@ func (a *Autopilot) Init(ctx context.Context) error {
func (a *Autopilot) Start(ctx context.Context) error {
log := logrus.WithFields(logrus.Fields{"component": "autopilot"})

// Wait 5 mins till we see kubelet auth config in place
timeout, cancel := context.WithTimeout(ctx, defaultPollTimeout)
defer cancel()

var restConfig *rest.Config
// wait.PollUntilWithContext passes it is own ctx argument as a ctx to the given function
// Poll until the kubelet config can be loaded successfully, as this is the access to the kube api
// needed by autopilot.
if err := wait.PollUntilWithContext(timeout, defaultPollDuration, func(ctx context.Context) (done bool, err error) {
log.Debugf("Attempting to load autopilot client config")
if restConfig, err = a.CertManager.GetRestConfig(ctx); err != nil {
log.WithError(err).Warnf("Failed to load autopilot client config, retrying in %v", defaultPollDuration)
return false, nil
var (
clientFactory apcli.FactoryInterface
waitErr error
)
if pollErr := wait.PollUntilContextTimeout(ctx, defaultPollDuration, defaultPollTimeout, true, func(ctx context.Context) (bool, error) {
clientFactory, waitErr = a.newClientFactory(ctx)
if waitErr == nil {
return true, nil
}

return true, nil
}); err != nil {
return fmt.Errorf("unable to create autopilot client: %w", err)
}

// Without the config, there is nothing that we can do.

if restConfig == nil {
return errors.New("unable to create an autopilot client -- timed out")
}
log.WithError(waitErr).Debug("Failed to create autopilot client factory, retrying in ", defaultPollDuration)
return false, nil
}); pollErr != nil {
if waitErr == nil {
return pollErr
}

autopilotClientFactory, err := apcli.NewClientFactory(restConfig)
if err != nil {
return fmt.Errorf("creating autopilot client factory error: %w", err)
return fmt.Errorf("%w: %w", pollErr, waitErr)
}

log.Info("Autopilot client factory created, booting up worker root controller")
Expand All @@ -91,7 +85,7 @@ func (a *Autopilot) Start(ctx context.Context) error {
ManagerPort: 8899,
MetricsBindAddr: "0",
HealthProbeBindAddr: "0",
}, log, autopilotClientFactory)
}, log, clientFactory)
if err != nil {
return fmt.Errorf("failed to create autopilot worker: %w", err)
}
Expand All @@ -111,3 +105,99 @@ func (a *Autopilot) Start(ctx context.Context) error {
func (a *Autopilot) Stop() error {
return nil
}

func (a *Autopilot) newClientFactory(ctx context.Context) (apcli.FactoryInterface, error) {
restConfig, err := a.CertManager.GetRestConfig(ctx)
if err != nil {
return nil, err
}

clientFactory, err := apcli.NewClientFactory(restConfig)
if err != nil {
return nil, err
}

// We need to wait until all autopilot CRDs are established.
// Gather all kinds in the autopilot API group.
var kinds []string
gv := autopilotv1beta2.SchemeGroupVersion
for kind := range k0sscheme.Scheme.KnownTypes(gv) {
// For some reason, the scheme also returns types from core/v1. Filter
// those out by only adding kinds which are _only_ in the autopilot
// group, and not in some other group as well. The only way to get all
// the GVKs for a certain type is by creating a new instance of that
// type and then asking the scheme about it.
obj, err := k0sscheme.Scheme.New(gv.WithKind(kind))
if err != nil {
return nil, err
}
gvks, _, err := k0sscheme.Scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}

// Skip the kind if there's at least one GVK which is not in the
// autopilot group
if !slices.ContainsFunc(gvks, func(gvk schema.GroupVersionKind) bool {
return gvk.Group != autopilotv1beta2.GroupName
}) {
kinds = append(kinds, kind)
}
}

client, err := clientFactory.GetExtensionClient()
if err != nil {
return nil, err
}

// Watch all the CRDs until all the required ones are established.
log := logrus.WithField("component", "autopilot")
slices.Sort(kinds) // for cosmetic purposes
if err = watch.CRDs(client.CustomResourceDefinitions()).
WithErrorCallback(func(err error) (time.Duration, error) {
if retryAfter, e := watch.IsRetryable(err); e == nil {
log.WithError(err).Info(
"Transient error while watching for CRDs",
", starting over after ", retryAfter, " ...",
)
return retryAfter, nil
}

retryAfter := 10 * time.Second
log.WithError(err).Error(
"Error while watching CRDs",
", starting over after ", retryAfter, " ...",
)
return retryAfter, nil
}).
Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) {
if item.Spec.Group != autopilotv1beta2.GroupName {
return false, nil // Not an autopilot CRD.
}

// Find the established status for the CRD.
var established apiextensionsv1.ConditionStatus
for _, cond := range item.Status.Conditions {
if cond.Type == apiextensionsv1.Established {
established = cond.Status
break
}
}

if established != apiextensionsv1.ConditionTrue {
return false, nil // CRD not yet established.
}

// Remove the CRD's (list) kind from the list.
kinds = slices.DeleteFunc(kinds, func(kind string) bool {
return kind == item.Spec.Names.Kind || kind == item.Spec.Names.ListKind
})

// If the list is empty, all required CRDs are established.
return len(kinds) < 1, nil
}); err != nil {
return nil, fmt.Errorf("while waiting for Autopilot CRDs %v to become established: %w", kinds, err)
}

return clientFactory, nil
}

0 comments on commit df9d2f6

Please sign in to comment.