Skip to content

Commit

Permalink
Add defaults to Settings if NotFound (kubernetes-sigs#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Oct 24, 2022
1 parent a47b621 commit c2d6e50
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 43 deletions.
22 changes: 21 additions & 1 deletion pkg/apis/config/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,32 @@ limitations under the License.

package config

import (
"fmt"

"knative.dev/pkg/configmap"
)

// Constructor should take the form func(*v1.ConfigMap) (T, error)
type Constructor interface{}

// Registration defines a ConfigMap registration to be watched by the settingsstore.Watcher
// and to be injected into the Reconcile() contexts of controllers
type Registration struct {
ConfigMapName string
Constructor Constructor
Constructor interface{}
DefaultData map[string]string
}

func (r Registration) Validate() error {
if r.ConfigMapName == "" {
return fmt.Errorf("configMap cannot be empty in SettingsStore registration")
}
if err := configmap.ValidateConstructor(r.Constructor); err != nil {
return fmt.Errorf("constructor validation failed in SettingsStore registration, %w", err)
}
if r.DefaultData == nil {
return fmt.Errorf("default value cannot be empty in SettingsStore registration")
}
return nil
}
31 changes: 22 additions & 9 deletions pkg/apis/config/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package settings

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"

"github.com/aws/karpenter-core/pkg/apis/config"
Expand All @@ -30,34 +33,44 @@ var ContextKey = Registration
var Registration = &config.Registration{
ConfigMapName: "karpenter-global-settings",
Constructor: NewSettingsFromConfigMap,
DefaultData: lo.Must(defaultSettings.Data()),
}

var defaultSettings = Settings{
BatchMaxDuration: time.Second * 10,
BatchIdleDuration: time.Second * 1,
BatchMaxDuration: metav1.Duration{Duration: time.Second * 10},
BatchIdleDuration: metav1.Duration{Duration: time.Second * 1},
}

type Settings struct {
BatchMaxDuration time.Duration
BatchIdleDuration time.Duration
BatchMaxDuration metav1.Duration `json:"batchMaxDuration"`
BatchIdleDuration metav1.Duration `json:"batchIdleDuration"`
}

func (s Settings) Data() (map[string]string, error) {
d := map[string]string{}

if err := json.Unmarshal(lo.Must(json.Marshal(defaultSettings)), &d); err != nil {
return d, fmt.Errorf("unmarshalling json data, %w", err)
}
return d, nil
}

// NewSettingsFromConfigMap creates a Settings from the supplied ConfigMap
func NewSettingsFromConfigMap(cm *v1.ConfigMap) (Settings, error) {
s := defaultSettings

if err := configmap.Parse(cm.Data,
AsPositiveDuration("batchMaxDuration", &s.BatchMaxDuration),
AsPositiveDuration("batchIdleDuration", &s.BatchIdleDuration),
AsPositiveMetaDuration("batchMaxDuration", &s.BatchMaxDuration),
AsPositiveMetaDuration("batchIdleDuration", &s.BatchIdleDuration),
); err != nil {
// Failing to parse means that there is some error in the Settings, so we should crash
panic(fmt.Sprintf("parsing config data, %v", err))
}
return s, nil
}

// AsPositiveDuration parses the value at key as a time.Duration into the target, if it exists.
func AsPositiveDuration(key string, target *time.Duration) configmap.ParseFunc {
// AsPositiveMetaDuration parses the value at key as a time.Duration into the target, if it exists.
func AsPositiveMetaDuration(key string, target *metav1.Duration) configmap.ParseFunc {
return func(data map[string]string) error {
if raw, ok := data[key]; ok {
val, err := time.ParseDuration(raw)
Expand All @@ -67,7 +80,7 @@ func AsPositiveDuration(key string, target *time.Duration) configmap.ParseFunc {
if val <= 0 {
return fmt.Errorf("duration value is not positive %q: %q", key, val)
}
*target = val
*target = metav1.Duration{Duration: val}
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (b *Batcher) Wait(ctx context.Context) {
return
}

timeout := time.NewTimer(settings.FromContext(ctx).BatchMaxDuration)
idle := time.NewTimer(settings.FromContext(ctx).BatchIdleDuration)
timeout := time.NewTimer(settings.FromContext(ctx).BatchMaxDuration.Duration)
idle := time.NewTimer(settings.FromContext(ctx).BatchIdleDuration.Duration)
for {
select {
case <-b.trigger:
Expand All @@ -83,7 +83,7 @@ func (b *Batcher) Wait(ctx context.Context) {
if !idle.Stop() {
<-idle.C
}
idle.Reset(settings.FromContext(ctx).BatchIdleDuration)
idle.Reset(settings.FromContext(ctx).BatchIdleDuration.Duration)
case <-b.immediate:
return
case <-timeout.C:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewCluster(ctx context.Context, clk clock.Clock, client client.Client, cp c
// The nominationPeriod is how long we consider a node as 'likely to be used' after a pending pod was
// nominated for it. This time can very depending on the batching window size + time spent scheduling
// so we try to adjust based off the window size.
nominationPeriod := 2 * settings.FromContext(ctx).BatchMaxDuration
nominationPeriod := 2 * settings.FromContext(ctx).BatchMaxDuration.Duration
if nominationPeriod < 10*time.Second {
nominationPeriod = 10 * time.Second
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/state/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ var _ = Describe("Node Resource Level", func() {
})
It("should trigger node nomination eviction observers", func() {
// Reduce the nomination timeframe for a quicker test
ctx = settings.ToContext(ctx, settings.Settings{BatchMaxDuration: time.Second, BatchIdleDuration: time.Second})
ctx = settings.ToContext(ctx, settings.Settings{BatchMaxDuration: metav1.Duration{Duration: time.Second}, BatchIdleDuration: metav1.Duration{Duration: time.Second}})
cluster = state.NewCluster(ctx, fakeClock, env.Client, cloudProvider)

node := test.Node(test.NodeOptions{
Expand Down
18 changes: 9 additions & 9 deletions pkg/operator/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var _ = BeforeEach(func() {
env = test.NewEnvironment(ctx, func(e *test.Environment) {
clientSet := kubernetes.NewForConfigOrDie(e.Config)
cmw = informer.NewInformedWatcher(clientSet, system.Namespace())
ss = settingsstore.WatchSettings(e.Ctx, cmw, settings.Registration)
ss = settingsstore.WatchSettingsOrDie(e.Ctx, clientSet, cmw, settings.Registration)

defaultConfigMap = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -78,8 +78,8 @@ var _ = Describe("Core Settings", func() {
It("should inject default settings into Reconcile loop", func() {
ExpectApplied(ctx, env.Client, defaultConfigMap.DeepCopy())
expected := settings.Settings{
BatchMaxDuration: time.Second * 10,
BatchIdleDuration: time.Second * 1,
BatchMaxDuration: metav1.Duration{Duration: time.Second * 10},
BatchIdleDuration: metav1.Duration{Duration: time.Second * 1},
}

fakeController := &FakeController{
Expand All @@ -96,13 +96,13 @@ var _ = Describe("Core Settings", func() {
})
It("should inject custom settings into Reconcile loop", func() {
expected := settings.Settings{
BatchMaxDuration: time.Second * 30,
BatchIdleDuration: time.Second * 5,
BatchMaxDuration: metav1.Duration{Duration: time.Second * 30},
BatchIdleDuration: metav1.Duration{Duration: time.Second * 5},
}
cm := defaultConfigMap.DeepCopy()
cm.Data = map[string]string{
"batchMaxDuration": expected.BatchMaxDuration.String(),
"batchIdleDuration": expected.BatchIdleDuration.String(),
"batchMaxDuration": expected.BatchMaxDuration.Duration.String(),
"batchIdleDuration": expected.BatchIdleDuration.Duration.String(),
}
ExpectApplied(ctx, env.Client, cm)

Expand All @@ -121,8 +121,8 @@ var _ = Describe("Core Settings", func() {
})

func ExpectSettingsMatch(g Gomega, a settings.Settings, b settings.Settings) {
g.Expect(a.BatchMaxDuration == b.BatchMaxDuration &&
a.BatchIdleDuration == b.BatchIdleDuration).To(BeTrue())
g.Expect(a.BatchMaxDuration.Duration == b.BatchMaxDuration.Duration &&
a.BatchIdleDuration.Duration == b.BatchIdleDuration.Duration).To(BeTrue())
}

func ExpectOperatorSettingsInjected(expected settings.Settings) ReconcileAssertion {
Expand Down
14 changes: 13 additions & 1 deletion pkg/operator/settingsstore/fake/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package fake

import (
"context"
"encoding/json"
"fmt"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"knative.dev/pkg/configmap"

Expand All @@ -27,14 +29,24 @@ import (
var SettingsRegistration = &config.Registration{
ConfigMapName: "karpenter-global-settings",
Constructor: NewFakeSettingsFromConfigMap,
DefaultData: lo.Must(defaultSettings.Data()),
}

var defaultSettings = Settings{
TestArg: "default",
}

type Settings struct {
TestArg string
TestArg string `json:"testArg"`
}

func (s Settings) Data() (map[string]string, error) {
d := map[string]string{}

if err := json.Unmarshal(lo.Must(json.Marshal(defaultSettings)), &d); err != nil {
return d, err
}
return d, nil
}

func NewFakeSettingsFromConfigMap(cm *v1.ConfigMap) (Settings, error) {
Expand Down
33 changes: 31 additions & 2 deletions pkg/operator/settingsstore/settingsstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ package settingsstore

import (
"context"
"fmt"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/configmap"
"knative.dev/pkg/configmap/informer"
"knative.dev/pkg/logging"
Expand All @@ -36,20 +41,44 @@ type store struct {
stores map[*config.Registration]*configmap.UntypedStore
}

func WatchSettings(ctx context.Context, cmw *informer.InformedWatcher, registrations ...*config.Registration) Store {
func WatchSettingsOrDie(ctx context.Context, clientSet *kubernetes.Clientset, cmw *informer.InformedWatcher, registrations ...*config.Registration) Store {
ss := &store{
registrations: registrations,
stores: map[*config.Registration]*configmap.UntypedStore{},
}
for _, registration := range registrations {
if err := registration.Validate(); err != nil {
panic(fmt.Sprintf("Validating settings registration, %v", err))
}

ss.stores[registration] = configmap.NewUntypedStore(
registration.ConfigMapName,
logging.FromContext(ctx),
configmap.Constructors{
registration.ConfigMapName: registration.Constructor,
},
)
ss.stores[registration].WatchConfigs(cmw)

// TODO: Remove this Get once we don't rely on this settingsStore for initialization
// Attempt to get the ConfigMap since WatchWithDefault doesn't wait for Add event form API-server
cm, err := clientSet.CoreV1().ConfigMaps(cmw.Namespace).Get(ctx, registration.ConfigMapName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
cm = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: registration.ConfigMapName,
Namespace: cmw.Namespace,
},
Data: registration.DefaultData,
}
} else {
panic(fmt.Sprintf("Getting settings %v, %v", registration.ConfigMapName, err))
}
}

// TODO: Move this to ss.stores[registration].WatchConfigs(cmw) when the UntypedStores
// implements a default mechanism
cmw.WatchWithDefault(*cm, ss.stores[registration].OnConfigChanged)
}
return ss
}
Expand Down
Loading

0 comments on commit c2d6e50

Please sign in to comment.