From 5471d361f0f0861599c3d044b4ecda0fc353bb9f Mon Sep 17 00:00:00 2001 From: Ellis Tarn Date: Mon, 24 Oct 2022 16:02:17 -0700 Subject: [PATCH] feat: Combine webhooks and controllers into a single binary (#29) --- .golangci.yaml | 2 +- hack/boilerplate.go.txt | 1 + pkg/cloudprovider/types.go | 8 +- pkg/controllers/consolidation/suite_test.go | 6 +- pkg/controllers/controllers.go | 42 +++-- pkg/operator/context.go | 96 ----------- pkg/operator/controller/suite_test.go | 6 +- pkg/operator/injection/injection.go | 17 -- pkg/operator/logger.go | 23 ++- pkg/operator/manager.go | 119 ------------- pkg/operator/operator.go | 178 ++++++++++++++++++++ pkg/operator/options/options.go | 14 +- pkg/operator/profiling.go | 40 +++++ pkg/operator/settingsstore/settingsstore.go | 4 +- pkg/operator/settingsstore/suite_test.go | 16 +- pkg/webhooks/webhooks.go | 78 +-------- 16 files changed, 312 insertions(+), 338 deletions(-) delete mode 100644 pkg/operator/context.go delete mode 100644 pkg/operator/manager.go create mode 100644 pkg/operator/operator.go create mode 100644 pkg/operator/profiling.go diff --git a/.golangci.yaml b/.golangci.yaml index 4534c5f47e..a37cf2f4bf 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -41,7 +41,7 @@ linters-settings: locale: US ignore-words: [] goimports: - local-prefixes: github.com/aws/karpenter + local-prefixes: github.com/aws/karpenter-core goheader: template: |- Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/hack/boilerplate.go.txt b/hack/boilerplate.go.txt index b4ede5d4b6..2b7dbd2b46 100644 --- a/hack/boilerplate.go.txt +++ b/hack/boilerplate.go.txt @@ -11,3 +11,4 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 43f881f379..b0772d772c 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -32,10 +32,10 @@ import ( type Context struct { context.Context - ClientSet *kubernetes.Clientset - KubeClient client.Client - EventRecorder record.EventRecorder - Clock clock.Clock + KubernetesInterface kubernetes.Interface + KubeClient client.Client + EventRecorder record.EventRecorder + Clock clock.Clock // StartAsync is a channel that is closed when leader election has been won. This is a signal to start any async // processing that should only occur while the cloud provider is the leader. StartAsync <-chan struct{} diff --git a/pkg/controllers/consolidation/suite_test.go b/pkg/controllers/consolidation/suite_test.go index 5b4ee6b3c7..49aa0e8ba6 100644 --- a/pkg/controllers/consolidation/suite_test.go +++ b/pkg/controllers/consolidation/suite_test.go @@ -57,7 +57,7 @@ var controller *consolidation.Controller var provisioningController *provisioning.Controller var provisioner *provisioning.Provisioner var cloudProvider *fake.CloudProvider -var clientSet *kubernetes.Clientset +var kubernetesInterface kubernetes.Interface var recorder *test.EventRecorder var nodeStateController *state.NodeController var fakeClock *clock.FakeClock @@ -80,9 +80,9 @@ var _ = BeforeSuite(func() { fakeClock = clock.NewFakeClock(time.Now()) cluster = state.NewCluster(ctx, fakeClock, env.Client, cloudProvider) nodeStateController = state.NewNodeController(env.Client, cluster) - clientSet = kubernetes.NewForConfigOrDie(e.Config) + kubernetesInterface = kubernetes.NewForConfigOrDie(e.Config) recorder = test.NewEventRecorder() - provisioner = provisioning.NewProvisioner(ctx, env.Client, clientSet.CoreV1(), recorder, cloudProvider, cluster, test.SettingsStore{}) + provisioner = provisioning.NewProvisioner(ctx, env.Client, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster, test.SettingsStore{}) provisioningController = provisioning.NewController(env.Client, provisioner, recorder) }) Expect(env.Start()).To(Succeed(), "Failed to start environment") diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index a71cc4d8a0..7d71d6368a 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -15,6 +15,12 @@ limitations under the License. package controllers import ( + "context" + + "k8s.io/client-go/kubernetes" + "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/controllers/consolidation" "github.com/aws/karpenter-core/pkg/controllers/counter" @@ -25,8 +31,8 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" "github.com/aws/karpenter-core/pkg/controllers/termination" + "github.com/aws/karpenter-core/pkg/events" "github.com/aws/karpenter-core/pkg/metrics" - "github.com/aws/karpenter-core/pkg/operator" "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/settingsstore" ) @@ -35,20 +41,30 @@ func init() { metrics.MustRegister() // Registers cross-controller metrics } -func GetControllers(ctx operator.Context, cluster *state.Cluster, settingsStore settingsstore.Store, cloudProvider cloudprovider.CloudProvider) []controller.Controller { - provisioner := provisioning.NewProvisioner(ctx, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.EventRecorder, cloudProvider, cluster, settingsStore) +func NewControllers( + ctx context.Context, + clock clock.Clock, + kubeClient client.Client, + kubernetesInterface kubernetes.Interface, + cluster *state.Cluster, + eventRecorder events.Recorder, + settingsStore settingsstore.Store, + cloudProvider cloudprovider.CloudProvider, +) []controller.Controller { + provisioner := provisioning.NewProvisioner(ctx, kubeClient, kubernetesInterface.CoreV1(), eventRecorder, cloudProvider, cluster, settingsStore) metricsstate.StartMetricScraper(ctx, cluster) + return []controller.Controller{ - provisioning.NewController(ctx.KubeClient, provisioner, ctx.EventRecorder), - state.NewNodeController(ctx.KubeClient, cluster), - state.NewPodController(ctx.KubeClient, cluster), - state.NewProvisionerController(ctx.KubeClient, cluster), - node.NewController(ctx.Clock, ctx.KubeClient, cloudProvider, cluster), - termination.NewController(ctx, ctx.Clock, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.EventRecorder, cloudProvider), - metricspod.NewController(ctx.KubeClient), - metricsprovisioner.NewController(ctx.KubeClient), - counter.NewController(ctx.KubeClient, cluster), - consolidation.NewController(ctx.Clock, ctx.KubeClient, provisioner, cloudProvider, ctx.EventRecorder, cluster), + provisioning.NewController(kubeClient, provisioner, eventRecorder), + state.NewNodeController(kubeClient, cluster), + state.NewPodController(kubeClient, cluster), + state.NewProvisionerController(kubeClient, cluster), + node.NewController(clock, kubeClient, cloudProvider, cluster), + termination.NewController(ctx, clock, kubeClient, kubernetesInterface.CoreV1(), eventRecorder, cloudProvider), + metricspod.NewController(kubeClient), + metricsprovisioner.NewController(kubeClient), + counter.NewController(kubeClient, cluster), + consolidation.NewController(clock, kubeClient, provisioner, cloudProvider, eventRecorder, cluster), } } diff --git a/pkg/operator/context.go b/pkg/operator/context.go deleted file mode 100644 index e1d8cb1d74..0000000000 --- a/pkg/operator/context.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package operator - -import ( - "context" - "runtime/debug" - - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - "k8s.io/utils/clock" - "knative.dev/pkg/configmap/informer" - "knative.dev/pkg/logging" - "knative.dev/pkg/system" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" - - "github.com/aws/karpenter-core/pkg/events" - "github.com/aws/karpenter-core/pkg/operator/injection" - "github.com/aws/karpenter-core/pkg/operator/options" -) - -const ( - appName = "karpenter" - component = "controller" -) - -// Context exposes a global context of components that can be used across the binary -// for initialization. -type Context struct { - context.Context - - EventRecorder events.Recorder // Decorated recorder for Karpenter core events - BaseEventRecorder record.EventRecorder // Recorder from controller manager for use by other components - ConfigMapWatcher *informer.InformedWatcher - KubeClient client.Client - Clientset *kubernetes.Clientset - Clock clock.Clock - Options *options.Options - StartAsync <-chan struct{} -} - -func NewOrDie() (Context, manager.Manager) { - opts := options.New().MustParse() - - // Setup Client - controllerRuntimeConfig := controllerruntime.GetConfigOrDie() - controllerRuntimeConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(opts.KubeClientQPS), opts.KubeClientBurst) - controllerRuntimeConfig.UserAgent = appName - clientSet := kubernetes.NewForConfigOrDie(controllerRuntimeConfig) - - // Set up logger and watch for changes to log level - cmw := informer.NewInformedWatcher(clientSet, system.Namespace()) - ctx := injection.LoggingContextOrDie(component, controllerRuntimeConfig, cmw) - ctx = injection.WithConfig(ctx, controllerRuntimeConfig) - ctx = injection.WithOptions(ctx, *opts) - - if opts.MemoryLimit > 0 { - newLimit := int64(float64(opts.MemoryLimit) * 0.9) - logging.FromContext(ctx).Infof("Setting GC memory limit to %d, container limit = %d", newLimit, opts.MemoryLimit) - debug.SetMemoryLimit(newLimit) - } - - manager := NewManagerOrDie(ctx, controllerRuntimeConfig, opts) - - baseRecorder := manager.GetEventRecorderFor(appName) - recorder := events.NewRecorder(baseRecorder) - recorder = events.NewLoadSheddingRecorder(recorder) - recorder = events.NewDedupeRecorder(recorder) - - return Context{ - Context: ctx, - EventRecorder: recorder, - BaseEventRecorder: baseRecorder, - ConfigMapWatcher: cmw, - Clientset: clientSet, - KubeClient: manager.GetClient(), - Clock: clock.RealClock{}, - Options: opts, - StartAsync: manager.Elected(), - }, manager -} diff --git a/pkg/operator/controller/suite_test.go b/pkg/operator/controller/suite_test.go index f227950280..1e7b95c9b1 100644 --- a/pkg/operator/controller/suite_test.go +++ b/pkg/operator/controller/suite_test.go @@ -53,9 +53,9 @@ func TestAPIs(t *testing.T) { var _ = BeforeEach(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { - clientSet := kubernetes.NewForConfigOrDie(e.Config) - cmw = informer.NewInformedWatcher(clientSet, system.Namespace()) - ss = settingsstore.WatchSettingsOrDie(e.Ctx, clientSet, cmw, settings.Registration) + kubernetesInterface := kubernetes.NewForConfigOrDie(e.Config) + cmw = informer.NewInformedWatcher(kubernetesInterface, system.Namespace()) + ss = settingsstore.WatchSettingsOrDie(e.Ctx, kubernetesInterface, cmw, settings.Registration) defaultConfigMap = &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/operator/injection/injection.go b/pkg/operator/injection/injection.go index b3feffc5f9..f457f8eac6 100644 --- a/pkg/operator/injection/injection.go +++ b/pkg/operator/injection/injection.go @@ -19,29 +19,12 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" - "knative.dev/pkg/configmap/informer" - knativeinjection "knative.dev/pkg/injection" - "knative.dev/pkg/injection/sharedmain" - "knative.dev/pkg/logging" - "knative.dev/pkg/signals" "github.com/aws/karpenter-core/pkg/operator/options" ) type resourceKey struct{} -// LoggingContextOrDie injects a logger into the returned context. The logger is -// configured by the ConfigMap `config-logging` and live updates the level. -func LoggingContextOrDie(componentName string, config *rest.Config, cmw *informer.InformedWatcher) context.Context { - ctx, startinformers := knativeinjection.EnableInjectionOrDie(signals.NewContext(), config) - logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, componentName) - ctx = logging.WithLogger(ctx, logger) - rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger}) - sharedmain.WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, componentName) - startinformers() - return ctx -} - func WithNamespacedName(ctx context.Context, namespacedname types.NamespacedName) context.Context { return context.WithValue(ctx, resourceKey{}, namespacedname) } diff --git a/pkg/operator/logger.go b/pkg/operator/logger.go index 703655abd2..5415d9525a 100644 --- a/pkg/operator/logger.go +++ b/pkg/operator/logger.go @@ -14,7 +14,28 @@ limitations under the License. package operator -import "github.com/go-logr/logr" +import ( + "context" + + "github.com/go-logr/logr" + "go.uber.org/zap" + "k8s.io/client-go/rest" + "knative.dev/pkg/configmap/informer" + "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/logging" +) + +// LoggingContextOrDie injects a logger into the returned context. The logger is +// configured by the ConfigMap `config-logging` and live updates the level. +func NewLogger(ctx context.Context, componentName string, config *rest.Config, cmw *informer.InformedWatcher) *zap.SugaredLogger { + ctx, startinformers := injection.EnableInjectionOrDie(ctx, config) + logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, componentName) + rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger}) + sharedmain.WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, componentName) + startinformers() + return logger +} type ignoreDebugEventsSink struct { name string diff --git a/pkg/operator/manager.go b/pkg/operator/manager.go deleted file mode 100644 index 82e2ce67e3..0000000000 --- a/pkg/operator/manager.go +++ /dev/null @@ -1,119 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package operator - -import ( - "context" - "fmt" - "net/http" - "net/http/pprof" - - "github.com/go-logr/zapr" - "go.uber.org/zap" - v1 "k8s.io/api/core/v1" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "knative.dev/pkg/logging" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/healthz" - "sigs.k8s.io/controller-runtime/pkg/manager" - - "github.com/aws/karpenter-core/pkg/operator/controller" - "github.com/aws/karpenter-core/pkg/operator/injection" - "github.com/aws/karpenter-core/pkg/operator/options" - "github.com/aws/karpenter-core/pkg/operator/scheme" - "github.com/aws/karpenter-core/pkg/operator/settingsstore" -) - -// NewManagerOrDie instantiates a controller manager or panics -func NewManagerOrDie(ctx context.Context, config *rest.Config, opts *options.Options) manager.Manager { - newManager, err := controllerruntime.NewManager(config, controllerruntime.Options{ - Logger: ignoreDebugEvents(zapr.NewLogger(logging.FromContext(ctx).Desugar())), - LeaderElection: opts.EnableLeaderElection, - LeaderElectionID: "karpenter-leader-election", - LeaderElectionResourceLock: resourcelock.LeasesResourceLock, - Scheme: scheme.Scheme, - MetricsBindAddress: fmt.Sprintf(":%d", opts.MetricsPort), - HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort), - BaseContext: newRunnableContext(config, opts, logging.FromContext(ctx)), - }) - if err != nil { - panic(fmt.Sprintf("Failed to create controller newManager, %s", err)) - } - if err := newManager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string { - return []string{o.(*v1.Pod).Spec.NodeName} - }); err != nil { - panic(fmt.Sprintf("Failed to setup pod indexer, %s", err)) - } - if opts.EnableProfiling { - utilruntime.Must(registerPprof(newManager)) - } - - return newManager -} - -// RegisterControllers registers a set of controllers to the controller manager -func RegisterControllers(ctx context.Context, settingsStore settingsstore.Store, m manager.Manager, controllers ...controller.Controller) manager.Manager { - for _, c := range controllers { - c = controller.InjectSettings(c, settingsStore) // inject settings in the controller reconcile loop - if err := c.Register(ctx, m); err != nil { - panic(err) - } - // if the controller implements a liveness check, connect it - if lp, ok := c.(controller.HealthCheck); ok { - utilruntime.Must(m.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe)) - } - } - if err := m.AddHealthzCheck("healthz", healthz.Ping); err != nil { - panic(fmt.Sprintf("Failed to add health probe, %s", err)) - } - if err := m.AddReadyzCheck("readyz", healthz.Ping); err != nil { - panic(fmt.Sprintf("Failed to add ready probe, %s", err)) - } - return m -} - -func newRunnableContext(config *rest.Config, options *options.Options, logger *zap.SugaredLogger) func() context.Context { - return func() context.Context { - ctx := context.Background() - ctx = logging.WithLogger(ctx, logger) - ctx = injection.WithConfig(ctx, config) - ctx = injection.WithOptions(ctx, *options) - return ctx - } -} - -func registerPprof(manager manager.Manager) error { - for path, handler := range map[string]http.Handler{ - "/debug/pprof/": http.HandlerFunc(pprof.Index), - "/debug/pprof/cmdline": http.HandlerFunc(pprof.Cmdline), - "/debug/pprof/profile": http.HandlerFunc(pprof.Profile), - "/debug/pprof/symbol": http.HandlerFunc(pprof.Symbol), - "/debug/pprof/trace": http.HandlerFunc(pprof.Trace), - "/debug/pprof/allocs": pprof.Handler("allocs"), - "/debug/pprof/heap": pprof.Handler("heap"), - "/debug/pprof/block": pprof.Handler("block"), - "/debug/pprof/goroutine": pprof.Handler("goroutine"), - "/debug/pprof/threadcreate": pprof.Handler("threadcreate"), - } { - err := manager.AddMetricsExtraHandler(path, handler) - if err != nil { - return err - } - } - return nil -} diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go new file mode 100644 index 0000000000..10ae3d52f2 --- /dev/null +++ b/pkg/operator/operator.go @@ -0,0 +1,178 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operator + +import ( + "context" + "fmt" + "sync" + + "github.com/go-logr/zapr" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/utils/clock" + "knative.dev/pkg/configmap/informer" + knativeinjection "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/logging" + "knative.dev/pkg/signals" + "knative.dev/pkg/system" + "knative.dev/pkg/webhook" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/aws/karpenter-core/pkg/apis/config/settings" + "github.com/aws/karpenter-core/pkg/events" + operatorcontroller "github.com/aws/karpenter-core/pkg/operator/controller" + "github.com/aws/karpenter-core/pkg/operator/injection" + "github.com/aws/karpenter-core/pkg/operator/options" + "github.com/aws/karpenter-core/pkg/operator/scheme" + "github.com/aws/karpenter-core/pkg/operator/settingsstore" +) + +const ( + appName = "karpenter" + component = "controller" +) + +type Operator struct { + manager.Manager + + KubernetesInterface kubernetes.Interface + SettingsStore settingsstore.Store + EventRecorder record.EventRecorder + Recorder events.Recorder // Deprecate me in favor of EventRecorder + Clock clock.Clock + + webhooks []knativeinjection.ControllerConstructor +} + +// NewManagerOrDie instantiates a controller manager or panics +func NewOperator() (context.Context, *Operator) { + // Root Context + ctx := signals.NewContext() + ctx = knativeinjection.WithNamespaceScope(ctx, system.Namespace()) + // TODO: This can be removed if we eventually decide that we need leader election. Having leader election has resulted in the webhook + // having issues described in https://github.com/aws/karpenter/issues/2562 so these issues need to be resolved if this line is removed + ctx = sharedmain.WithHADisabled(ctx) // Disable leader election for webhook + + // Options + opts := options.New() + ctx = injection.WithOptions(ctx, *opts) + + // Webhook + ctx = webhook.WithOptions(ctx, webhook.Options{ + Port: opts.WebhookPort, + ServiceName: opts.ServiceName, + SecretName: fmt.Sprintf("%s-cert", opts.ServiceName), + }) + + // Client Config + config := controllerruntime.GetConfigOrDie() + config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(opts.KubeClientQPS), opts.KubeClientBurst) + config.UserAgent = appName + + // Client + kubernetesInterface := kubernetes.NewForConfigOrDie(config) + configMapWatcher := informer.NewInformedWatcher(kubernetesInterface, system.Namespace()) + + // Settings + settingsStore := settingsstore.WatchSettingsOrDie(ctx, kubernetesInterface, configMapWatcher, settings.Registration) + ctx = settingsStore.InjectSettings(ctx) + + // Logging + logger := NewLogger(ctx, component, config, configMapWatcher) + ctx = logging.WithLogger(ctx, logger) + + lo.Must0(configMapWatcher.Start(ctx.Done())) + + // Manager + manager, err := controllerruntime.NewManager(config, controllerruntime.Options{ + Logger: ignoreDebugEvents(zapr.NewLogger(logger.Desugar())), + LeaderElection: opts.EnableLeaderElection, + LeaderElectionID: "karpenter-leader-election", + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + Scheme: scheme.Scheme, + MetricsBindAddress: fmt.Sprintf(":%d", opts.MetricsPort), + HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort), + BaseContext: func() context.Context { + ctx := context.Background() + ctx = logging.WithLogger(ctx, logger) + ctx = injection.WithConfig(ctx, config) + ctx = injection.WithOptions(ctx, *opts) + return ctx + }, + }) + manager = lo.Must(manager, err, "failed to setup manager") + if opts.EnableProfiling { + registerPprof(manager) + } + lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string { + return []string{o.(*v1.Pod).Spec.NodeName} + }), "failed to setup pod indexer") + + // Event Recorder + eventRecorder := manager.GetEventRecorderFor(appName) + recorder := events.NewRecorder(eventRecorder) + recorder = events.NewLoadSheddingRecorder(recorder) + recorder = events.NewDedupeRecorder(recorder) + + return ctx, &Operator{ + Manager: manager, + KubernetesInterface: kubernetesInterface, + SettingsStore: settingsStore, + Recorder: recorder, + EventRecorder: eventRecorder, + Clock: clock.RealClock{}, + } +} + +func (o *Operator) WithControllers(ctx context.Context, controllers ...operatorcontroller.Controller) *Operator { + for _, c := range controllers { + lo.Must0(operatorcontroller.InjectSettings(c, o.SettingsStore).Register(ctx, o.Manager), "failed to register controller") + if lp, ok := c.(operatorcontroller.HealthCheck); ok { + lo.Must0(o.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe), "failed to setup liveness probe") + } + } + lo.Must0(o.AddHealthzCheck("healthz", healthz.Ping), "failed to setup liveness probe") + lo.Must0(o.AddReadyzCheck("readyz", healthz.Ping), "failed to setup readiness probe") + return o +} + +func (o *Operator) WithWebhooks(webhooks ...knativeinjection.ControllerConstructor) *Operator { + o.webhooks = append(o.webhooks, webhooks...) + return o +} + +func (o *Operator) Start(ctx context.Context) { + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + lo.Must0(o.Manager.Start(ctx)) + }() + wg.Add(1) + go func() { + defer wg.Done() + sharedmain.MainWithConfig(ctx, "webhook", o.GetConfig(), o.webhooks...) + }() + wg.Wait() +} diff --git a/pkg/operator/options/options.go b/pkg/operator/options/options.go index 012421f8ff..5a8193ce59 100644 --- a/pkg/operator/options/options.go +++ b/pkg/operator/options/options.go @@ -20,6 +20,7 @@ import ( "fmt" "net/url" "os" + "runtime/debug" "go.uber.org/multierr" @@ -37,6 +38,8 @@ const ( type Options struct { *flag.FlagSet // Vendor Neutral + ServiceName string + WebhookPort int MetricsPort int HealthProbePort int KubeClientQPS int @@ -62,6 +65,8 @@ func New() *Options { opts.FlagSet = f // Vendor Neutral + f.StringVar(&opts.ServiceName, "karpenter-service", env.WithDefaultString("KARPENTER_SERVICE", ""), "The Karpenter Service name for the dynamic webhook certificate") + f.IntVar(&opts.WebhookPort, "webhook-port", env.WithDefaultInt("PORT", 8443), "The port the webhook endpoint binds to for validation and mutation of resources") f.IntVar(&opts.MetricsPort, "metrics-port", env.WithDefaultInt("METRICS_PORT", 8080), "The port the metric endpoint binds to for operating metrics about the controller itself") f.IntVar(&opts.HealthProbePort, "health-probe-port", env.WithDefaultInt("HEALTH_PROBE_PORT", 8081), "The port the health probe endpoint binds to for reporting controller health") f.IntVar(&opts.KubeClientQPS, "kube-client-qps", env.WithDefaultInt("KUBE_CLIENT_QPS", 200), "The smoothed rate of qps to kube-apiserver") @@ -69,7 +74,6 @@ func New() *Options { f.BoolVar(&opts.EnableProfiling, "enable-profiling", env.WithDefaultBool("ENABLE_PROFILING", false), "Enable the profiling on the metric endpoint") f.BoolVar(&opts.EnableLeaderElection, "leader-elect", env.WithDefaultBool("LEADER_ELECT", true), "Start leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") f.Int64Var(&opts.MemoryLimit, "memory-limit", env.WithDefaultInt64("MEMORY_LIMIT", -1), "Memory limit on the container running the controller. The GC soft memory limit is set to 90% of this value.") - // AWS Specific f.StringVar(&opts.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "The kubernetes cluster name for resource discovery") f.StringVar(&opts.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with") @@ -79,7 +83,13 @@ func New() *Options { f.StringVar(&opts.AWSDefaultInstanceProfile, "aws-default-instance-profile", env.WithDefaultString("AWS_DEFAULT_INSTANCE_PROFILE", ""), "The default instance profile to use when provisioning nodes in AWS") f.BoolVar(&opts.AWSEnablePodENI, "aws-enable-pod-eni", env.WithDefaultBool("AWS_ENABLE_POD_ENI", false), "If true then instances that support pod ENI will report a vpc.amazonaws.com/pod-eni resource") f.BoolVar(&opts.AWSIsolatedVPC, "aws-isolated-vpc", env.WithDefaultBool("AWS_ISOLATED_VPC", false), "If true then assume we can't reach AWS services which don't have a VPC endpoint. This also has the effect of disabling look-ups to the AWS pricing endpoint.") - return opts + + if opts.MemoryLimit > 0 { + newLimit := int64(float64(opts.MemoryLimit) * 0.9) + debug.SetMemoryLimit(newLimit) + } + + return opts.MustParse() } // MustParse reads the user passed flags, environment variables, and default values. diff --git a/pkg/operator/profiling.go b/pkg/operator/profiling.go new file mode 100644 index 0000000000..b062204624 --- /dev/null +++ b/pkg/operator/profiling.go @@ -0,0 +1,40 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operator + +import ( + "net/http" + "net/http/pprof" + + "github.com/samber/lo" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func registerPprof(manager manager.Manager) { + for path, handler := range map[string]http.Handler{ + "/debug/pprof/": http.HandlerFunc(pprof.Index), + "/debug/pprof/cmdline": http.HandlerFunc(pprof.Cmdline), + "/debug/pprof/profile": http.HandlerFunc(pprof.Profile), + "/debug/pprof/symbol": http.HandlerFunc(pprof.Symbol), + "/debug/pprof/trace": http.HandlerFunc(pprof.Trace), + "/debug/pprof/allocs": pprof.Handler("allocs"), + "/debug/pprof/heap": pprof.Handler("heap"), + "/debug/pprof/block": pprof.Handler("block"), + "/debug/pprof/goroutine": pprof.Handler("goroutine"), + "/debug/pprof/threadcreate": pprof.Handler("threadcreate"), + } { + lo.Must0(manager.AddMetricsExtraHandler(path, handler), "setting up profiling") + } +} diff --git a/pkg/operator/settingsstore/settingsstore.go b/pkg/operator/settingsstore/settingsstore.go index 9e5ef2c7c2..39f65afeb7 100644 --- a/pkg/operator/settingsstore/settingsstore.go +++ b/pkg/operator/settingsstore/settingsstore.go @@ -41,7 +41,7 @@ type store struct { stores map[*config.Registration]*configmap.UntypedStore } -func WatchSettingsOrDie(ctx context.Context, clientSet *kubernetes.Clientset, cmw *informer.InformedWatcher, registrations ...*config.Registration) Store { +func WatchSettingsOrDie(ctx context.Context, kubernetesInterface kubernetes.Interface, cmw *informer.InformedWatcher, registrations ...*config.Registration) Store { ss := &store{ registrations: registrations, stores: map[*config.Registration]*configmap.UntypedStore{}, @@ -61,7 +61,7 @@ func WatchSettingsOrDie(ctx context.Context, clientSet *kubernetes.Clientset, cm // TODO: Remove this Get once we don't rely on this settingsStore for initialization // Attempt to get the ConfigMap since WatchWithDefault doesn't wait for Add event form API-server - cm, err := clientSet.CoreV1().ConfigMaps(cmw.Namespace).Get(ctx, registration.ConfigMapName, metav1.GetOptions{}) + cm, err := kubernetesInterface.CoreV1().ConfigMaps(cmw.Namespace).Get(ctx, registration.ConfigMapName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { cm = &v1.ConfigMap{ diff --git a/pkg/operator/settingsstore/suite_test.go b/pkg/operator/settingsstore/suite_test.go index bcd45416d0..728f87b6ca 100644 --- a/pkg/operator/settingsstore/suite_test.go +++ b/pkg/operator/settingsstore/suite_test.go @@ -40,7 +40,7 @@ import ( var ctx context.Context var env *test.Environment var cmw *informer.InformedWatcher -var clientSet *kubernetes.Clientset +var kubernetesInterface kubernetes.Interface var ss settingsstore.Store var defaultConfigMap *v1.ConfigMap @@ -52,8 +52,8 @@ func TestAPIs(t *testing.T) { var _ = BeforeEach(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { - clientSet = kubernetes.NewForConfigOrDie(e.Config) - cmw = informer.NewInformedWatcher(clientSet, system.Namespace()) + kubernetesInterface = kubernetes.NewForConfigOrDie(e.Config) + cmw = informer.NewInformedWatcher(kubernetesInterface, system.Namespace()) defaultConfigMap = &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -73,7 +73,7 @@ var _ = AfterEach(func() { var _ = Describe("Operator Settings", func() { BeforeEach(func() { ExpectApplied(ctx, env.Client, defaultConfigMap) - ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, kubernetesInterface, cmw, settings.Registration) Expect(cmw.Start(env.Ctx.Done())).To(Succeed()) }) It("should have default values", func() { @@ -112,7 +112,7 @@ var _ = Describe("Multiple Settings", func() { ExpectApplied(ctx, env.Client, defaultConfigMap) }) It("should get operator settings and features from same configMap", func() { - ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration, fake.SettingsRegistration) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, kubernetesInterface, cmw, settings.Registration, fake.SettingsRegistration) Expect(cmw.Start(env.Ctx.Done())).To(Succeed()) Eventually(func(g Gomega) { testCtx := ss.InjectSettings(ctx) @@ -121,7 +121,7 @@ var _ = Describe("Multiple Settings", func() { }).Should(Succeed()) }) It("should get operator settings and features from same configMap", func() { - ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration, fake.SettingsRegistration) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, kubernetesInterface, cmw, settings.Registration, fake.SettingsRegistration) Expect(cmw.Start(env.Ctx.Done())).To(Succeed()) cm := defaultConfigMap.DeepCopy() @@ -145,7 +145,7 @@ var _ = Describe("Multiple Settings", func() { var _ = Describe("ConfigMap Doesn't Exist on Startup", func() { It("should default if the configMap doesn't exist on startup", func() { - ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, kubernetesInterface, cmw, settings.Registration) _ = cmw.Start(env.Ctx.Done()) Eventually(func(g Gomega) { @@ -156,7 +156,7 @@ var _ = Describe("ConfigMap Doesn't Exist on Startup", func() { }).Should(Succeed()) }) It("should start watching settings when ConfigMap is added", func() { - ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, kubernetesInterface, cmw, settings.Registration) _ = cmw.Start(env.Ctx.Done()) cm := defaultConfigMap.DeepCopy() diff --git a/pkg/webhooks/webhooks.go b/pkg/webhooks/webhooks.go index 3dbb7a9604..f2e6b650b1 100644 --- a/pkg/webhooks/webhooks.go +++ b/pkg/webhooks/webhooks.go @@ -16,89 +16,29 @@ package webhooks import ( "context" - "flag" - "fmt" - "runtime/debug" - "k8s.io/client-go/kubernetes" "knative.dev/pkg/configmap" - "knative.dev/pkg/configmap/informer" "knative.dev/pkg/controller" knativeinjection "knative.dev/pkg/injection" - "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/logging" - "knative.dev/pkg/system" - "knative.dev/pkg/webhook" "knative.dev/pkg/webhook/certificates" "knative.dev/pkg/webhook/configmaps" "knative.dev/pkg/webhook/resourcesemantics/defaulting" "knative.dev/pkg/webhook/resourcesemantics/validation" "github.com/aws/karpenter-core/pkg/apis" - "github.com/aws/karpenter-core/pkg/cloudprovider" - "github.com/aws/karpenter-core/pkg/operator/injection" - "github.com/aws/karpenter-core/pkg/utils/env" ) -type WebhookOpts struct { - KarpenterService string - WebhookPort int - MemoryLimit int64 -} - -var ( - component = "webhook" - opts = WebhookOpts{} -) - -func init() { - flag.StringVar(&opts.KarpenterService, "karpenter-service", env.WithDefaultString("KARPENTER_SERVICE", ""), "The Karpenter Service name for the dynamic webhook certificate") - flag.IntVar(&opts.WebhookPort, "port", env.WithDefaultInt("PORT", 8443), "The port the webhook endpoint binds to for validation and mutation of resources") - flag.Int64Var(&opts.MemoryLimit, "memory-limit", env.WithDefaultInt64("MEMORY_LIMIT", -1), "Memory limit on the container running the webhook. The GC soft memory limit is set to 90% of this value.") -} - -func Initialize(injectCloudProvider func(cloudprovider.Context) cloudprovider.CloudProvider) { - config := knativeinjection.ParseAndGetRESTConfigOrDie() - - // Set up logger and watch for changes to log level - clientSet := kubernetes.NewForConfigOrDie(config) - cmw := informer.NewInformedWatcher(clientSet, system.Namespace()) - ctx := injection.LoggingContextOrDie(component, config, cmw) - ctx = knativeinjection.WithNamespaceScope(ctx, system.Namespace()) - - ctx = webhook.WithOptions(ctx, webhook.Options{ - Port: opts.WebhookPort, - ServiceName: opts.KarpenterService, - SecretName: fmt.Sprintf("%s-cert", opts.KarpenterService), - }) - // TODO: This can be removed if we eventually decide that we need leader election. Having leader election has resulted in the webhook - // having issues described in https://github.com/aws/karpenter/issues/2562 so these issues need to be resolved if this line is removed - ctx = sharedmain.WithHADisabled(ctx) // Disable leader election for webhook - - // Register the cloud provider to attach vendor specific validation logic. - // TODO(https://github.com/aws/karpenter/issues/2052) - injectCloudProvider(cloudprovider.Context{ - Context: ctx, - ClientSet: clientSet, - WebhookOnly: true, - }) - - if opts.MemoryLimit > 0 { - newLimit := int64(float64(opts.MemoryLimit) * 0.9) - logging.FromContext(ctx).Infof("Setting GC memory limit to %d, container limit = %d", newLimit, opts.MemoryLimit) - debug.SetMemoryLimit(newLimit) - } - - // Controllers and webhook - sharedmain.MainWithConfig(ctx, "webhook", config, +func NewWebhooks() []knativeinjection.ControllerConstructor { + return []knativeinjection.ControllerConstructor{ certificates.NewController, - newCRDDefaultingWebhook, - newCRDValidationWebhook, - newConfigValidationController, - ) + NewCRDDefaultingWebhook, + NewCRDValidationWebhook, + NewConfigValidationWebhook, + } } -func newCRDDefaultingWebhook(ctx context.Context, w configmap.Watcher) *controller.Impl { +func NewCRDDefaultingWebhook(ctx context.Context, w configmap.Watcher) *controller.Impl { return defaulting.NewAdmissionController(ctx, "defaulting.webhook.provisioners.karpenter.sh", "/default-resource", @@ -108,7 +48,7 @@ func newCRDDefaultingWebhook(ctx context.Context, w configmap.Watcher) *controll ) } -func newCRDValidationWebhook(ctx context.Context, w configmap.Watcher) *controller.Impl { +func NewCRDValidationWebhook(ctx context.Context, w configmap.Watcher) *controller.Impl { return validation.NewAdmissionController(ctx, "validation.webhook.provisioners.karpenter.sh", "/validate-resource", @@ -118,7 +58,7 @@ func newCRDValidationWebhook(ctx context.Context, w configmap.Watcher) *controll ) } -func newConfigValidationController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { +func NewConfigValidationWebhook(ctx context.Context, cmw configmap.Watcher) *controller.Impl { return configmaps.NewAdmissionController(ctx, "validation.webhook.config.karpenter.sh", "/config-validation",