From c2d6e5014c66ffd2ec5d9c0453ee1b98b184cfd2 Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Mon, 24 Oct 2022 15:00:40 -0500 Subject: [PATCH] Add defaults to Settings if NotFound (#28) --- pkg/apis/config/registration.go | 22 ++++++- pkg/apis/config/settings/settings.go | 31 +++++++--- pkg/controllers/provisioning/batcher.go | 6 +- pkg/controllers/state/cluster.go | 2 +- pkg/controllers/state/suite_test.go | 2 +- pkg/operator/controller/suite_test.go | 18 +++--- pkg/operator/settingsstore/fake/settings.go | 14 ++++- pkg/operator/settingsstore/settingsstore.go | 33 ++++++++++- pkg/operator/settingsstore/suite_test.go | 64 ++++++++++++++++----- pkg/test/settings.go | 6 +- 10 files changed, 155 insertions(+), 43 deletions(-) diff --git a/pkg/apis/config/registration.go b/pkg/apis/config/registration.go index de27e32c37..82626e8e4d 100644 --- a/pkg/apis/config/registration.go +++ b/pkg/apis/config/registration.go @@ -14,6 +14,12 @@ limitations under the License. package config +import ( + "fmt" + + "knative.dev/pkg/configmap" +) + // Constructor should take the form func(*v1.ConfigMap) (T, error) type Constructor interface{} @@ -21,5 +27,19 @@ type Constructor interface{} // and to be injected into the Reconcile() contexts of controllers type Registration struct { ConfigMapName string - Constructor Constructor + Constructor interface{} + DefaultData map[string]string +} + +func (r Registration) Validate() error { + if r.ConfigMapName == "" { + return fmt.Errorf("configMap cannot be empty in SettingsStore registration") + } + if err := configmap.ValidateConstructor(r.Constructor); err != nil { + return fmt.Errorf("constructor validation failed in SettingsStore registration, %w", err) + } + if r.DefaultData == nil { + return fmt.Errorf("default value cannot be empty in SettingsStore registration") + } + return nil } diff --git a/pkg/apis/config/settings/settings.go b/pkg/apis/config/settings/settings.go index 153ab8a9ff..6d0455ac4e 100644 --- a/pkg/apis/config/settings/settings.go +++ b/pkg/apis/config/settings/settings.go @@ -16,10 +16,13 @@ package settings import ( "context" + "encoding/json" "fmt" "time" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" "github.com/aws/karpenter-core/pkg/apis/config" @@ -30,16 +33,26 @@ var ContextKey = Registration var Registration = &config.Registration{ ConfigMapName: "karpenter-global-settings", Constructor: NewSettingsFromConfigMap, + DefaultData: lo.Must(defaultSettings.Data()), } var defaultSettings = Settings{ - BatchMaxDuration: time.Second * 10, - BatchIdleDuration: time.Second * 1, + BatchMaxDuration: metav1.Duration{Duration: time.Second * 10}, + BatchIdleDuration: metav1.Duration{Duration: time.Second * 1}, } type Settings struct { - BatchMaxDuration time.Duration - BatchIdleDuration time.Duration + BatchMaxDuration metav1.Duration `json:"batchMaxDuration"` + BatchIdleDuration metav1.Duration `json:"batchIdleDuration"` +} + +func (s Settings) Data() (map[string]string, error) { + d := map[string]string{} + + if err := json.Unmarshal(lo.Must(json.Marshal(defaultSettings)), &d); err != nil { + return d, fmt.Errorf("unmarshalling json data, %w", err) + } + return d, nil } // NewSettingsFromConfigMap creates a Settings from the supplied ConfigMap @@ -47,8 +60,8 @@ func NewSettingsFromConfigMap(cm *v1.ConfigMap) (Settings, error) { s := defaultSettings if err := configmap.Parse(cm.Data, - AsPositiveDuration("batchMaxDuration", &s.BatchMaxDuration), - AsPositiveDuration("batchIdleDuration", &s.BatchIdleDuration), + AsPositiveMetaDuration("batchMaxDuration", &s.BatchMaxDuration), + AsPositiveMetaDuration("batchIdleDuration", &s.BatchIdleDuration), ); err != nil { // Failing to parse means that there is some error in the Settings, so we should crash panic(fmt.Sprintf("parsing config data, %v", err)) @@ -56,8 +69,8 @@ func NewSettingsFromConfigMap(cm *v1.ConfigMap) (Settings, error) { return s, nil } -// AsPositiveDuration parses the value at key as a time.Duration into the target, if it exists. -func AsPositiveDuration(key string, target *time.Duration) configmap.ParseFunc { +// AsPositiveMetaDuration parses the value at key as a time.Duration into the target, if it exists. +func AsPositiveMetaDuration(key string, target *metav1.Duration) configmap.ParseFunc { return func(data map[string]string) error { if raw, ok := data[key]; ok { val, err := time.ParseDuration(raw) @@ -67,7 +80,7 @@ func AsPositiveDuration(key string, target *time.Duration) configmap.ParseFunc { if val <= 0 { return fmt.Errorf("duration value is not positive %q: %q", key, val) } - *target = val + *target = metav1.Duration{Duration: val} } return nil } diff --git a/pkg/controllers/provisioning/batcher.go b/pkg/controllers/provisioning/batcher.go index b1b6f64ea5..d498bf1468 100644 --- a/pkg/controllers/provisioning/batcher.go +++ b/pkg/controllers/provisioning/batcher.go @@ -71,8 +71,8 @@ func (b *Batcher) Wait(ctx context.Context) { return } - timeout := time.NewTimer(settings.FromContext(ctx).BatchMaxDuration) - idle := time.NewTimer(settings.FromContext(ctx).BatchIdleDuration) + timeout := time.NewTimer(settings.FromContext(ctx).BatchMaxDuration.Duration) + idle := time.NewTimer(settings.FromContext(ctx).BatchIdleDuration.Duration) for { select { case <-b.trigger: @@ -83,7 +83,7 @@ func (b *Batcher) Wait(ctx context.Context) { if !idle.Stop() { <-idle.C } - idle.Reset(settings.FromContext(ctx).BatchIdleDuration) + idle.Reset(settings.FromContext(ctx).BatchIdleDuration.Duration) case <-b.immediate: return case <-timeout.C: diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index a38a7c1bad..00ce52df22 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -76,7 +76,7 @@ func NewCluster(ctx context.Context, clk clock.Clock, client client.Client, cp c // The nominationPeriod is how long we consider a node as 'likely to be used' after a pending pod was // nominated for it. This time can very depending on the batching window size + time spent scheduling // so we try to adjust based off the window size. - nominationPeriod := 2 * settings.FromContext(ctx).BatchMaxDuration + nominationPeriod := 2 * settings.FromContext(ctx).BatchMaxDuration.Duration if nominationPeriod < 10*time.Second { nominationPeriod = 10 * time.Second } diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 2539f5d6a3..870efc96f1 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -532,7 +532,7 @@ var _ = Describe("Node Resource Level", func() { }) It("should trigger node nomination eviction observers", func() { // Reduce the nomination timeframe for a quicker test - ctx = settings.ToContext(ctx, settings.Settings{BatchMaxDuration: time.Second, BatchIdleDuration: time.Second}) + ctx = settings.ToContext(ctx, settings.Settings{BatchMaxDuration: metav1.Duration{Duration: time.Second}, BatchIdleDuration: metav1.Duration{Duration: time.Second}}) cluster = state.NewCluster(ctx, fakeClock, env.Client, cloudProvider) node := test.Node(test.NodeOptions{ diff --git a/pkg/operator/controller/suite_test.go b/pkg/operator/controller/suite_test.go index 3dff8188fa..f227950280 100644 --- a/pkg/operator/controller/suite_test.go +++ b/pkg/operator/controller/suite_test.go @@ -55,7 +55,7 @@ var _ = BeforeEach(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { clientSet := kubernetes.NewForConfigOrDie(e.Config) cmw = informer.NewInformedWatcher(clientSet, system.Namespace()) - ss = settingsstore.WatchSettings(e.Ctx, cmw, settings.Registration) + ss = settingsstore.WatchSettingsOrDie(e.Ctx, clientSet, cmw, settings.Registration) defaultConfigMap = &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -78,8 +78,8 @@ var _ = Describe("Core Settings", func() { It("should inject default settings into Reconcile loop", func() { ExpectApplied(ctx, env.Client, defaultConfigMap.DeepCopy()) expected := settings.Settings{ - BatchMaxDuration: time.Second * 10, - BatchIdleDuration: time.Second * 1, + BatchMaxDuration: metav1.Duration{Duration: time.Second * 10}, + BatchIdleDuration: metav1.Duration{Duration: time.Second * 1}, } fakeController := &FakeController{ @@ -96,13 +96,13 @@ var _ = Describe("Core Settings", func() { }) It("should inject custom settings into Reconcile loop", func() { expected := settings.Settings{ - BatchMaxDuration: time.Second * 30, - BatchIdleDuration: time.Second * 5, + BatchMaxDuration: metav1.Duration{Duration: time.Second * 30}, + BatchIdleDuration: metav1.Duration{Duration: time.Second * 5}, } cm := defaultConfigMap.DeepCopy() cm.Data = map[string]string{ - "batchMaxDuration": expected.BatchMaxDuration.String(), - "batchIdleDuration": expected.BatchIdleDuration.String(), + "batchMaxDuration": expected.BatchMaxDuration.Duration.String(), + "batchIdleDuration": expected.BatchIdleDuration.Duration.String(), } ExpectApplied(ctx, env.Client, cm) @@ -121,8 +121,8 @@ var _ = Describe("Core Settings", func() { }) func ExpectSettingsMatch(g Gomega, a settings.Settings, b settings.Settings) { - g.Expect(a.BatchMaxDuration == b.BatchMaxDuration && - a.BatchIdleDuration == b.BatchIdleDuration).To(BeTrue()) + g.Expect(a.BatchMaxDuration.Duration == b.BatchMaxDuration.Duration && + a.BatchIdleDuration.Duration == b.BatchIdleDuration.Duration).To(BeTrue()) } func ExpectOperatorSettingsInjected(expected settings.Settings) ReconcileAssertion { diff --git a/pkg/operator/settingsstore/fake/settings.go b/pkg/operator/settingsstore/fake/settings.go index f81fcdb1a0..7d01f4f9da 100644 --- a/pkg/operator/settingsstore/fake/settings.go +++ b/pkg/operator/settingsstore/fake/settings.go @@ -16,8 +16,10 @@ package fake import ( "context" + "encoding/json" "fmt" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" "knative.dev/pkg/configmap" @@ -27,6 +29,7 @@ import ( var SettingsRegistration = &config.Registration{ ConfigMapName: "karpenter-global-settings", Constructor: NewFakeSettingsFromConfigMap, + DefaultData: lo.Must(defaultSettings.Data()), } var defaultSettings = Settings{ @@ -34,7 +37,16 @@ var defaultSettings = Settings{ } type Settings struct { - TestArg string + TestArg string `json:"testArg"` +} + +func (s Settings) Data() (map[string]string, error) { + d := map[string]string{} + + if err := json.Unmarshal(lo.Must(json.Marshal(defaultSettings)), &d); err != nil { + return d, err + } + return d, nil } func NewFakeSettingsFromConfigMap(cm *v1.ConfigMap) (Settings, error) { diff --git a/pkg/operator/settingsstore/settingsstore.go b/pkg/operator/settingsstore/settingsstore.go index b4271570b2..9e5ef2c7c2 100644 --- a/pkg/operator/settingsstore/settingsstore.go +++ b/pkg/operator/settingsstore/settingsstore.go @@ -16,8 +16,13 @@ package settingsstore import ( "context" + "fmt" "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "knative.dev/pkg/configmap" "knative.dev/pkg/configmap/informer" "knative.dev/pkg/logging" @@ -36,12 +41,16 @@ type store struct { stores map[*config.Registration]*configmap.UntypedStore } -func WatchSettings(ctx context.Context, cmw *informer.InformedWatcher, registrations ...*config.Registration) Store { +func WatchSettingsOrDie(ctx context.Context, clientSet *kubernetes.Clientset, cmw *informer.InformedWatcher, registrations ...*config.Registration) Store { ss := &store{ registrations: registrations, stores: map[*config.Registration]*configmap.UntypedStore{}, } for _, registration := range registrations { + if err := registration.Validate(); err != nil { + panic(fmt.Sprintf("Validating settings registration, %v", err)) + } + ss.stores[registration] = configmap.NewUntypedStore( registration.ConfigMapName, logging.FromContext(ctx), @@ -49,7 +58,27 @@ func WatchSettings(ctx context.Context, cmw *informer.InformedWatcher, registrat registration.ConfigMapName: registration.Constructor, }, ) - ss.stores[registration].WatchConfigs(cmw) + + // TODO: Remove this Get once we don't rely on this settingsStore for initialization + // Attempt to get the ConfigMap since WatchWithDefault doesn't wait for Add event form API-server + cm, err := clientSet.CoreV1().ConfigMaps(cmw.Namespace).Get(ctx, registration.ConfigMapName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + cm = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: registration.ConfigMapName, + Namespace: cmw.Namespace, + }, + Data: registration.DefaultData, + } + } else { + panic(fmt.Sprintf("Getting settings %v, %v", registration.ConfigMapName, err)) + } + } + + // TODO: Move this to ss.stores[registration].WatchConfigs(cmw) when the UntypedStores + // implements a default mechanism + cmw.WatchWithDefault(*cm, ss.stores[registration].OnConfigChanged) } return ss } diff --git a/pkg/operator/settingsstore/suite_test.go b/pkg/operator/settingsstore/suite_test.go index fe669c6040..bcd45416d0 100644 --- a/pkg/operator/settingsstore/suite_test.go +++ b/pkg/operator/settingsstore/suite_test.go @@ -40,6 +40,7 @@ import ( var ctx context.Context var env *test.Environment var cmw *informer.InformedWatcher +var clientSet *kubernetes.Clientset var ss settingsstore.Store var defaultConfigMap *v1.ConfigMap @@ -51,7 +52,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeEach(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { - clientSet := kubernetes.NewForConfigOrDie(e.Config) + clientSet = kubernetes.NewForConfigOrDie(e.Config) cmw = informer.NewInformedWatcher(clientSet, system.Namespace()) defaultConfigMap = &v1.ConfigMap{ @@ -60,35 +61,35 @@ var _ = BeforeEach(func() { Namespace: system.Namespace(), }, } - ExpectApplied(ctx, e.Client, defaultConfigMap) }) Expect(env.Start()).To(Succeed()) }) var _ = AfterEach(func() { - Expect(env.Client.Delete(ctx, defaultConfigMap.DeepCopy())).To(Succeed()) + ExpectDeleted(ctx, env.Client, defaultConfigMap.DeepCopy()) Expect(env.Stop()).To(Succeed()) }) var _ = Describe("Operator Settings", func() { BeforeEach(func() { - ss = settingsstore.WatchSettings(env.Ctx, cmw, settings.Registration) + ExpectApplied(ctx, env.Client, defaultConfigMap) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration) Expect(cmw.Start(env.Ctx.Done())).To(Succeed()) }) It("should have default values", func() { Eventually(func(g Gomega) { testCtx := ss.InjectSettings(ctx) s := settings.FromContext(testCtx) - g.Expect(s.BatchIdleDuration).To(Equal(1 * time.Second)) - g.Expect(s.BatchMaxDuration).To(Equal(10 * time.Second)) + g.Expect(s.BatchIdleDuration.Duration).To(Equal(1 * time.Second)) + g.Expect(s.BatchMaxDuration.Duration).To(Equal(10 * time.Second)) }).Should(Succeed()) }) It("should update if values are changed", func() { Eventually(func(g Gomega) { testCtx := ss.InjectSettings(ctx) s := settings.FromContext(testCtx) - g.Expect(s.BatchIdleDuration).To(Equal(1 * time.Second)) - g.Expect(s.BatchMaxDuration).To(Equal(10 * time.Second)) + g.Expect(s.BatchIdleDuration.Duration).To(Equal(1 * time.Second)) + g.Expect(s.BatchMaxDuration.Duration).To(Equal(10 * time.Second)) }) cm := defaultConfigMap.DeepCopy() cm.Data = map[string]string{ @@ -100,15 +101,18 @@ var _ = Describe("Operator Settings", func() { Eventually(func(g Gomega) { testCtx := ss.InjectSettings(ctx) s := settings.FromContext(testCtx) - g.Expect(s.BatchIdleDuration).To(Equal(2 * time.Second)) - g.Expect(s.BatchMaxDuration).To(Equal(15 * time.Second)) + g.Expect(s.BatchIdleDuration.Duration).To(Equal(2 * time.Second)) + g.Expect(s.BatchMaxDuration.Duration).To(Equal(15 * time.Second)) }).Should(Succeed()) }) }) var _ = Describe("Multiple Settings", func() { + BeforeEach(func() { + ExpectApplied(ctx, env.Client, defaultConfigMap) + }) It("should get operator settings and features from same configMap", func() { - ss = settingsstore.WatchSettings(env.Ctx, cmw, settings.Registration, fake.SettingsRegistration) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration, fake.SettingsRegistration) Expect(cmw.Start(env.Ctx.Done())).To(Succeed()) Eventually(func(g Gomega) { testCtx := ss.InjectSettings(ctx) @@ -117,7 +121,7 @@ var _ = Describe("Multiple Settings", func() { }).Should(Succeed()) }) It("should get operator settings and features from same configMap", func() { - ss = settingsstore.WatchSettings(env.Ctx, cmw, settings.Registration, fake.SettingsRegistration) + ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration, fake.SettingsRegistration) Expect(cmw.Start(env.Ctx.Done())).To(Succeed()) cm := defaultConfigMap.DeepCopy() @@ -132,9 +136,41 @@ var _ = Describe("Multiple Settings", func() { testCtx := ss.InjectSettings(ctx) s := settings.FromContext(testCtx) fs := fake.SettingsFromContext(testCtx) - g.Expect(s.BatchIdleDuration).To(Equal(2 * time.Second)) - g.Expect(s.BatchMaxDuration).To(Equal(15 * time.Second)) + g.Expect(s.BatchIdleDuration.Duration).To(Equal(2 * time.Second)) + g.Expect(s.BatchMaxDuration.Duration).To(Equal(15 * time.Second)) g.Expect(fs.TestArg).To(Equal("my-value")) }).Should(Succeed()) }) }) + +var _ = Describe("ConfigMap Doesn't Exist on Startup", func() { + It("should default if the configMap doesn't exist on startup", func() { + ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration) + _ = cmw.Start(env.Ctx.Done()) + + Eventually(func(g Gomega) { + testCtx := ss.InjectSettings(ctx) + s := settings.FromContext(testCtx) + g.Expect(s.BatchIdleDuration.Duration).To(Equal(1 * time.Second)) + g.Expect(s.BatchMaxDuration.Duration).To(Equal(10 * time.Second)) + }).Should(Succeed()) + }) + It("should start watching settings when ConfigMap is added", func() { + ss = settingsstore.WatchSettingsOrDie(env.Ctx, clientSet, cmw, settings.Registration) + _ = cmw.Start(env.Ctx.Done()) + + cm := defaultConfigMap.DeepCopy() + cm.Data = map[string]string{ + "batchIdleDuration": "2s", + "batchMaxDuration": "15s", + } + ExpectApplied(ctx, env.Client, cm) + + Eventually(func(g Gomega) { + testCtx := ss.InjectSettings(ctx) + s := settings.FromContext(testCtx) + g.Expect(s.BatchIdleDuration.Duration).To(Equal(2 * time.Second)) + g.Expect(s.BatchMaxDuration.Duration).To(Equal(15 * time.Second)) + }).Should(Succeed()) + }) +}) diff --git a/pkg/test/settings.go b/pkg/test/settings.go index 23e5eeeb8f..49db05c8e5 100644 --- a/pkg/test/settings.go +++ b/pkg/test/settings.go @@ -18,6 +18,8 @@ import ( "context" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/aws/karpenter-core/pkg/apis/config/settings" ) @@ -29,7 +31,7 @@ func (ss SettingsStore) InjectSettings(ctx context.Context) context.Context { func Settings() settings.Settings { return settings.Settings{ - BatchMaxDuration: time.Second * 10, - BatchIdleDuration: time.Second, + BatchMaxDuration: metav1.Duration{Duration: time.Second * 10}, + BatchIdleDuration: metav1.Duration{Duration: time.Second}, } }