diff --git a/api/v1/composition.go b/api/v1/composition.go index 3adf6ec2..d0e929b8 100644 --- a/api/v1/composition.go +++ b/api/v1/composition.go @@ -59,10 +59,13 @@ type Synthesis struct { // metadata.generation of the Composition at the time of synthesis. ObservedGeneration int64 `json:"observedGeneration,omitempty"` + // metadata.generation of the Synthesizer at the time of synthesis. + ObservedSynthesizerGeneration *int64 `json:"observedSynthesizerGeneration,omitempty"` + // Number of resulting resource slices. Since they are immutable, this provides adequate timing signal to avoid stale informer caches. - ResourceSliceCount int64 `json:"resourceSliceCount,omitempty"` + ResourceSliceCount *int64 `json:"resourceSliceCount,omitempty"` - Ready bool `json:"ready,omitempty"` - Synced bool `json:"synced,omitempty"` - PodCreation metav1.Time `json:"podCreation,omitempty"` + Ready bool `json:"ready,omitempty"` + Synced bool `json:"synced,omitempty"` + PodCreation *metav1.Time `json:"podCreation,omitempty"` } diff --git a/api/v1/config/crd/eno.azure.io_compositions.yaml b/api/v1/config/crd/eno.azure.io_compositions.yaml index e9a5dff2..1b04b3d2 100644 --- a/api/v1/config/crd/eno.azure.io_compositions.yaml +++ b/api/v1/config/crd/eno.azure.io_compositions.yaml @@ -75,6 +75,11 @@ spec: of synthesis. format: int64 type: integer + observedSynthesizerGeneration: + description: metadata.generation of the Synthesizer at the time + of synthesis. + format: int64 + type: integer podCreation: format: date-time type: string @@ -98,6 +103,11 @@ spec: of synthesis. format: int64 type: integer + observedSynthesizerGeneration: + description: metadata.generation of the Synthesizer at the time + of synthesis. + format: int64 + type: integer podCreation: format: date-time type: string diff --git a/api/v1/config/crd/eno.azure.io_synthesizers.yaml b/api/v1/config/crd/eno.azure.io_synthesizers.yaml index a90e2aac..471d8c75 100644 --- a/api/v1/config/crd/eno.azure.io_synthesizers.yaml +++ b/api/v1/config/crd/eno.azure.io_synthesizers.yaml @@ -45,6 +45,12 @@ spec: complete. format: int64 type: integer + lastRolloutTime: + description: LastRolloutTime is the timestamp of the last pod creation + caused by a change to this resource. Should not be updated due to + Composotion changes. Used to calculate rollout cooldown period. + format: date-time + type: string type: object type: object served: true diff --git a/api/v1/synthesizer.go b/api/v1/synthesizer.go index 231233a9..116f365c 100644 --- a/api/v1/synthesizer.go +++ b/api/v1/synthesizer.go @@ -1,6 +1,8 @@ package v1 -import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) // +kubebuilder:object:root=true type SynthesizerList struct { @@ -29,6 +31,11 @@ type SynthesizerStatus struct { // The metadata.generation of this resource at the oldest version currently used by any Generations. // This will equal the current generation when slow rollout of an update to the Generations is complete. CurrentGeneration int64 `json:"currentGeneration,omitempty"` + + // LastRolloutTime is the timestamp of the last pod creation caused by a change to this resource. + // Should not be updated due to Composotion changes. + // Used to calculate rollout cooldown period. + LastRolloutTime *metav1.Time `json:"lastRolloutTime,omitempty"` } type SynthesizerRef struct { diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index ff317a45..b9fdbc6f 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -308,7 +308,20 @@ func (in *ResourceState) DeepCopy() *ResourceState { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Synthesis) DeepCopyInto(out *Synthesis) { *out = *in - in.PodCreation.DeepCopyInto(&out.PodCreation) + if in.ObservedSynthesizerGeneration != nil { + in, out := &in.ObservedSynthesizerGeneration, &out.ObservedSynthesizerGeneration + *out = new(int64) + **out = **in + } + if in.ResourceSliceCount != nil { + in, out := &in.ResourceSliceCount, &out.ResourceSliceCount + *out = new(int64) + **out = **in + } + if in.PodCreation != nil { + in, out := &in.PodCreation, &out.PodCreation + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Synthesis. @@ -327,7 +340,7 @@ func (in *Synthesizer) DeepCopyInto(out *Synthesizer) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Synthesizer. @@ -413,6 +426,10 @@ func (in *SynthesizerSpec) DeepCopy() *SynthesizerSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SynthesizerStatus) DeepCopyInto(out *SynthesizerStatus) { *out = *in + if in.LastRolloutTime != nil { + in, out := &in.LastRolloutTime, &out.LastRolloutTime + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SynthesizerStatus. diff --git a/internal/manager/manager.go b/internal/manager/manager.go new file mode 100644 index 00000000..57da0a56 --- /dev/null +++ b/internal/manager/manager.go @@ -0,0 +1,64 @@ +package manager + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + apiv1 "github.com/Azure/eno/api/v1" +) + +const ( + IdxSlicesByCompositionGeneration = ".metadata.ownerReferences.compositionGen" // see: NewSlicesByCompositionGenerationKey +) + +func New(cfg *rest.Config, logger logr.Logger) (ctrl.Manager, error) { + mgr, err := ctrl.NewManager(cfg, manager.Options{ + Logger: logger, + }) + if err != nil { + return nil, err + } + + err = apiv1.SchemeBuilder.AddToScheme(mgr.GetScheme()) + if err != nil { + return nil, err + } + + err = mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.ResourceSlice{}, IdxSlicesByCompositionGeneration, func(o client.Object) []string { + slice := o.(*apiv1.ResourceSlice) + owner := metav1.GetControllerOf(slice) + if owner == nil || owner.Kind != "Composition" { + return nil + } + return []string{NewSlicesByCompositionGenerationKey(owner.Name, slice.Spec.CompositionGeneration)} + }) + if err != nil { + return nil, err + } + + return mgr, nil +} + +func NewLogConstructor(mgr ctrl.Manager, controllerName string) func(*reconcile.Request) logr.Logger { + return func(req *reconcile.Request) logr.Logger { + l := mgr.GetLogger().WithValues("controller", controllerName) + if req != nil { + l.WithValues("requestName", req.Name, "requestNamespace", req.Namespace) + } + return l + } +} + +// NewSlicesByCompositionGenerationKey documents the key structure used by IdxSlicesByCompositionGeneration. +func NewSlicesByCompositionGenerationKey(compName string, compGeneration int64) string { + // keys will not collide because k8s doesn't allow slashes in names + return fmt.Sprintf("%s/%d", compName, compGeneration) +} diff --git a/internal/reconstitution/reconstituter.go b/internal/reconstitution/reconstituter.go index e174b332..5565c433 100644 --- a/internal/reconstitution/reconstituter.go +++ b/internal/reconstitution/reconstituter.go @@ -6,13 +6,13 @@ import ( "sync/atomic" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" "github.com/go-logr/logr" ) @@ -28,30 +28,16 @@ type reconstituter struct { } func newReconstituter(mgr ctrl.Manager) (*reconstituter, error) { - // Index resource slices by the specific synthesis they originate from - err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.ResourceSlice{}, indexName, func(o client.Object) []string { - slice := o.(*apiv1.ResourceSlice) - owner := metav1.GetControllerOf(slice) - if owner == nil || owner.Kind != "Composition" { - return nil - } - // keys will not collide because k8s doesn't allow slashes in names - return []string{fmt.Sprintf("%s/%d", owner.Name, slice.Spec.CompositionGeneration)} - }) - if err != nil { - return nil, err - } - r := &reconstituter{ cache: newCache(mgr.GetClient()), client: mgr.GetClient(), } - _, err = ctrl.NewControllerManagedBy(mgr). + return r, ctrl.NewControllerManagedBy(mgr). Named("reconstituter"). For(&apiv1.Composition{}). Owns(&apiv1.ResourceSlice{}). - Build(r) - return r, err + WithLogConstructor(manager.NewLogConstructor(mgr, "reconstituter")). + Complete(r) } func (r *reconstituter) AddQueue(queue workqueue.Interface) { @@ -96,6 +82,10 @@ func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Compositi if synthesis == nil { return nil } + if synthesis.ResourceSliceCount == nil { + logger.V(1).Info("resource synthesis is not complete - waiting to fill cache") + return nil + } compNSN := types.NamespacedName{Namespace: comp.Namespace, Name: comp.Name} logger = logger.WithValues("synthesisGen", synthesis.ObservedGeneration) @@ -107,14 +97,14 @@ func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Compositi slices := &apiv1.ResourceSliceList{} err := r.client.List(ctx, slices, client.InNamespace(comp.Namespace), client.MatchingFields{ - indexName: fmt.Sprintf("%s/%d", comp.Name, synthesis.ObservedGeneration), + manager.IdxSlicesByCompositionGeneration: manager.NewSlicesByCompositionGenerationKey(comp.Name, synthesis.ObservedGeneration), }) if err != nil { return fmt.Errorf("listing resource slices: %w", err) } logger.V(1).Info(fmt.Sprintf("found %d slices for this synthesis", len(slices.Items))) - if int64(len(slices.Items)) != synthesis.ResourceSliceCount { + if int64(len(slices.Items)) != *synthesis.ResourceSliceCount { logger.V(1).Info("stale informer - waiting for sync") return nil } diff --git a/internal/reconstitution/reconstituter_test.go b/internal/reconstitution/reconstituter_test.go index f026b982..ce6efbf6 100644 --- a/internal/reconstitution/reconstituter_test.go +++ b/internal/reconstitution/reconstituter_test.go @@ -30,9 +30,10 @@ func TestReconstituterIntegration(t *testing.T) { comp.Namespace = "default" require.NoError(t, client.Create(ctx, comp)) + one := int64(1) comp.Status.CurrentState = &apiv1.Synthesis{ ObservedGeneration: comp.Generation, - ResourceSliceCount: 1, + ResourceSliceCount: &one, } require.NoError(t, client.Status().Update(ctx, comp)) diff --git a/internal/reconstitution/writebuffer.go b/internal/reconstitution/writebuffer.go index 1a33f2ca..6b47f847 100644 --- a/internal/reconstitution/writebuffer.go +++ b/internal/reconstitution/writebuffer.go @@ -25,7 +25,6 @@ type asyncStatusUpdate struct { // updates over a short period of time and applying them in a single update request. type writeBuffer struct { client client.Client - logger logr.Logger // queue items are per-slice. // the state map collects multiple updates per slice to be dispatched by next queue item. @@ -34,10 +33,9 @@ type writeBuffer struct { queue workqueue.RateLimitingInterface } -func newWriteBuffer(cli client.Client, logger logr.Logger, batchInterval time.Duration, burst int) *writeBuffer { +func newWriteBuffer(cli client.Client, batchInterval time.Duration, burst int) *writeBuffer { return &writeBuffer{ client: cli, - logger: logger.WithValues("controller", "writeBuffer"), state: make(map[types.NamespacedName][]*asyncStatusUpdate), queue: workqueue.NewRateLimitingQueueWithConfig( &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Every(batchInterval), burst)}, @@ -80,14 +78,14 @@ func (w *writeBuffer) processQueueItem(ctx context.Context) bool { defer w.queue.Done(item) sliceNSN := item.(types.NamespacedName) + logger := logr.FromContextOrDiscard(ctx).WithValues("slice", sliceNSN, "controller", "writeBuffer") + ctx = logr.NewContext(ctx, logger) + w.mut.Lock() updates := w.state[sliceNSN] delete(w.state, sliceNSN) w.mut.Unlock() - logger := w.logger.WithValues("slice", sliceNSN) - ctx = logr.NewContext(ctx, logger) - if len(updates) == 0 { logger.V(0).Info("dropping queue item because no updates were found for this slice (this is suspicious)") } diff --git a/internal/reconstitution/writebuffer_test.go b/internal/reconstitution/writebuffer_test.go index 37207c2f..a3caf4b0 100644 --- a/internal/reconstitution/writebuffer_test.go +++ b/internal/reconstitution/writebuffer_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/types" @@ -19,9 +18,9 @@ import ( ) func TestWriteBufferBasics(t *testing.T) { - ctx := context.Background() + ctx := testutil.NewContext(t) cli := testutil.NewClient(t) - w := newWriteBuffer(cli, testr.New(t), 0, 1) + w := newWriteBuffer(cli, 0, 1) // One resource slice w/ len of 3 slice := &apiv1.ResourceSlice{} @@ -49,7 +48,7 @@ func TestWriteBufferBasics(t *testing.T) { } func TestWriteBufferBatching(t *testing.T) { - ctx := context.Background() + ctx := testutil.NewContext(t) var updateCalls atomic.Int32 cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{ SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { @@ -57,7 +56,7 @@ func TestWriteBufferBatching(t *testing.T) { return client.SubResource(subResourceName).Update(ctx, obj, opts...) }, }) - w := newWriteBuffer(cli, testr.New(t), time.Millisecond*2, 1) + w := newWriteBuffer(cli, time.Millisecond*2, 1) // One resource slice w/ len of 3 slice := &apiv1.ResourceSlice{} @@ -87,9 +86,9 @@ func TestWriteBufferBatching(t *testing.T) { } func TestWriteBufferNoUpdates(t *testing.T) { - ctx := context.Background() + ctx := testutil.NewContext(t) cli := testutil.NewClient(t) - w := newWriteBuffer(cli, testr.New(t), 0, 1) + w := newWriteBuffer(cli, 0, 1) // One resource slice w/ len of 3 slice := &apiv1.ResourceSlice{} @@ -113,9 +112,9 @@ func TestWriteBufferNoUpdates(t *testing.T) { } func TestWriteBufferMissingSlice(t *testing.T) { - ctx := context.Background() + ctx := testutil.NewContext(t) cli := testutil.NewClient(t) - w := newWriteBuffer(cli, testr.New(t), 0, 1) + w := newWriteBuffer(cli, 0, 1) req := &ManifestRef{} req.Slice.Name = "test-slice-1" // this doesn't exist @@ -128,14 +127,14 @@ func TestWriteBufferMissingSlice(t *testing.T) { } func TestWriteBufferNoChange(t *testing.T) { - ctx := context.Background() + ctx := testutil.NewContext(t) cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{ SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { t.Fatal("should not have sent any status updates") return nil }, }) - w := newWriteBuffer(cli, testr.New(t), 0, 1) + w := newWriteBuffer(cli, 0, 1) // One resource slice slice := &apiv1.ResourceSlice{} @@ -155,13 +154,13 @@ func TestWriteBufferNoChange(t *testing.T) { } func TestWriteBufferUpdateError(t *testing.T) { - ctx := context.Background() + ctx := testutil.NewContext(t) cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{ SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { return errors.New("could be any error") }, }) - w := newWriteBuffer(cli, testr.New(t), 0, 1) + w := newWriteBuffer(cli, 0, 1) // One resource slice w/ len of 3 slice := &apiv1.ResourceSlice{} diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index baada427..c670b849 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + goruntime "runtime" "testing" "time" @@ -17,9 +18,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" "sigs.k8s.io/controller-runtime/pkg/envtest" - "sigs.k8s.io/controller-runtime/pkg/manager" apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" ) func NewClient(t testing.TB) client.Client { @@ -47,12 +48,16 @@ func NewContext(t *testing.T) context.Context { t.Cleanup(func() { cancel() }) - return logr.NewContext(ctx, testr.NewWithOptions(t, testr.Options{Verbosity: 99})) + return logr.NewContext(ctx, testr.NewWithOptions(t, testr.Options{Verbosity: 2})) } func NewManager(t *testing.T) *Manager { + _, b, _, _ := goruntime.Caller(0) + root := filepath.Join(filepath.Dir(b), "..", "..") + env := &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "api", "v1", "config", "crd")}, + CRDDirectoryPaths: []string{filepath.Join(root, "api", "v1", "config", "crd")}, + ErrorIfCRDPathMissing: true, } t.Cleanup(func() { err := env.Stop() @@ -64,13 +69,7 @@ func NewManager(t *testing.T) *Manager { cfg, err := env.Start() require.NoError(t, err) - mgr, err := ctrl.NewManager(cfg, manager.Options{ - Logger: testr.New(t), - BaseContext: func() context.Context { return NewContext(t) }, - }) - require.NoError(t, err) - - err = apiv1.SchemeBuilder.AddToScheme(mgr.GetScheme()) + mgr, err := manager.New(cfg, logr.FromContextOrDiscard(NewContext(t))) require.NoError(t, err) return &Manager{