From b583600685abec19c4c217e1790a0b991c831043 Mon Sep 17 00:00:00 2001 From: Jordan Olshevski Date: Mon, 20 Nov 2023 11:29:17 -0600 Subject: [PATCH] Add manager/queue processor --- internal/reconstitution/api.go | 14 ++++++ internal/reconstitution/manager.go | 55 +++++++++++++++++++++++ internal/reconstitution/queueprocessor.go | 54 ++++++++++++++++++++++ internal/reconstitution/reconstituter.go | 2 - 4 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 internal/reconstitution/manager.go create mode 100644 internal/reconstitution/queueprocessor.go diff --git a/internal/reconstitution/api.go b/internal/reconstitution/api.go index a724e154..4f625f5d 100644 --- a/internal/reconstitution/api.go +++ b/internal/reconstitution/api.go @@ -1,14 +1,28 @@ package reconstitution import ( + "context" "time" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" apiv1 "github.com/Azure/eno/api/v1" ) +// Reconciler is implemented by types that can reconcile individual, reconstituted resources. +type Reconciler interface { + Name() string + Reconcile(ctx context.Context, req *Request) (ctrl.Result, error) +} + +// Client provides read/write access to a collection of reconstituted resources. +type Client interface { + Get(ctx context.Context, ref *ResourceRef, gen int64) (*Resource, bool) + PatchStatusAsync(ctx context.Context, req *ManifestRef, patchFn StatusPatchFn) +} + type StatusPatchFn func(*apiv1.ResourceState) bool // ManifestRef references a particular resource manifest within a resource slice. diff --git a/internal/reconstitution/manager.go b/internal/reconstitution/manager.go new file mode 100644 index 00000000..1956b6e5 --- /dev/null +++ b/internal/reconstitution/manager.go @@ -0,0 +1,55 @@ +package reconstitution + +import ( + "time" + + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" +) + +// New creates a new Manager, which is responsible for "reconstituting" resources +// i.e. allowing controllers to treat them as individual resources instead of their storage representation (ResourceSlice). +func New(mgr ctrl.Manager, writeBatchInterval time.Duration) (Manager, error) { + m := &reconcilerManager{ + Manager: mgr, + } + m.writeBuffer = newWriteBuffer(mgr.GetClient(), writeBatchInterval, 1) + mgr.Add(m.writeBuffer) + + var err error + m.reconstituter, err = newReconstituter(mgr) + if err != nil { + return nil, err + } + + return m, nil +} + +type Manager interface { + GetClient() Client + Add(rec Reconciler) error +} + +type reconcilerManager struct { + ctrl.Manager + *reconstituter + *writeBuffer +} + +func (m *reconcilerManager) GetClient() Client { return m } + +func (m *reconcilerManager) Add(rec Reconciler) error { + rateLimiter := workqueue.DefaultControllerRateLimiter() + queue := workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{ + Name: rec.Name(), + }) + qp := &queueProcessor{ + Client: m.Manager.GetClient(), + Queue: queue, + Recon: m.reconstituter, + Handler: rec, + Logger: m.Manager.GetLogger().WithValues("controller", rec.Name()), + } + m.reconstituter.AddQueue(queue) + return m.Manager.Add(qp) +} diff --git a/internal/reconstitution/queueprocessor.go b/internal/reconstitution/queueprocessor.go new file mode 100644 index 00000000..e8b75bc7 --- /dev/null +++ b/internal/reconstitution/queueprocessor.go @@ -0,0 +1,54 @@ +package reconstitution + +import ( + "context" + + "github.com/go-logr/logr" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type queueProcessor struct { + Client client.Client + Queue workqueue.RateLimitingInterface + Recon *reconstituter + Handler Reconciler + Logger logr.Logger +} + +func (q *queueProcessor) Start(ctx context.Context) error { + go func() { + <-ctx.Done() + q.Queue.ShutDown() + }() + for q.processQueueItem(ctx) { + } + return nil +} + +func (q *queueProcessor) processQueueItem(ctx context.Context) bool { + item, shutdown := q.Queue.Get() + if shutdown { + return false + } + defer q.Queue.Done(item) + + req := item.(*Request) + logger := q.Logger.WithValues("compositionName", req.Composition.Name, "compositionNamespace", req.Composition.Namespace, "resourceKind", req.ResourceRef.Kind, "resourceName", req.ResourceRef.Name, "resourceNamespace", req.ResourceRef.Namespace) + ctx = logr.NewContext(ctx, logger) + + result, err := q.Handler.Reconcile(ctx, req) + if err != nil { + q.Queue.AddRateLimited(item) + logger.Error(err, "error while processing queue item") + return true + } + if result.RequeueAfter != 0 { + q.Queue.Forget(item) + q.Queue.AddAfter(item, result.RequeueAfter) + return true + } + + q.Queue.Forget(item) + return true +} diff --git a/internal/reconstitution/reconstituter.go b/internal/reconstitution/reconstituter.go index fb136615..cc122cbe 100644 --- a/internal/reconstitution/reconstituter.go +++ b/internal/reconstitution/reconstituter.go @@ -16,8 +16,6 @@ import ( "github.com/go-logr/logr" ) -const indexName = ".metadata.owner" - // reconstituter reconstitutes individual resources out of resource slices. // Similar to an informer but with extra logic to handle expanding the slice resources. type reconstituter struct {