From aacab3eec92b2778d65d9c74581240f6eebd1d73 Mon Sep 17 00:00:00 2001 From: Jordan Olshevski Date: Tue, 2 Jan 2024 15:19:06 -0600 Subject: [PATCH] Perf optimizations / fixes (#26) Adds a simple performance test manifest and implements various fixes / optimizations found while experimenting with the test. --------- Co-authored-by: Jordan Olshevski --- cmd/eno-controller/main.go | 9 ++- cmd/eno-reconciler/main.go | 9 ++- dev/deploy.yaml | 3 + examples/loadtest.yaml | 36 +++++++++ go.mod | 3 + go.sum | 9 +++ .../reconciliation/discoverycache.go | 65 ++++++++++++---- .../reconciliation/integration_test.go | 78 +++++++++++++++++++ internal/controllers/synthesis/exec.go | 3 +- internal/controllers/synthesis/exec_test.go | 26 +++++++ internal/manager/manager.go | 13 ++++ internal/reconstitution/reconstituter.go | 29 +++---- 12 files changed, 250 insertions(+), 33 deletions(-) create mode 100644 examples/loadtest.yaml diff --git a/cmd/eno-controller/main.go b/cmd/eno-controller/main.go index ad5ae34b..af3f8055 100644 --- a/cmd/eno-controller/main.go +++ b/cmd/eno-controller/main.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/zapr" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ctrl "sigs.k8s.io/controller-runtime" "github.com/Azure/eno/internal/controllers/synthesis" @@ -28,14 +29,20 @@ func run() error { var ( rolloutCooldown time.Duration synthesisTimeout time.Duration + debugLogging bool synconf = &synthesis.Config{} ) flag.DurationVar(&synconf.Timeout, "synthesis-pod-timeout", time.Minute, "Maximum lifespan of synthesizer pods") flag.DurationVar(&synthesisTimeout, "synthesis-timeout", time.Second*30, "Timeout when executing synthesizer binaries") flag.DurationVar(&rolloutCooldown, "rollout-cooldown", time.Second*30, "Minimum period of time between each ensuing composition update after a synthesizer is updated") + flag.BoolVar(&debugLogging, "debug", true, "Enable debug logging") flag.Parse() - zl, err := zap.NewProduction() + zapCfg := zap.NewProductionConfig() + if debugLogging { + zapCfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) + } + zl, err := zapCfg.Build() if err != nil { return err } diff --git a/cmd/eno-reconciler/main.go b/cmd/eno-reconciler/main.go index 5e74247b..66064d44 100644 --- a/cmd/eno-reconciler/main.go +++ b/cmd/eno-reconciler/main.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/zapr" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ctrl "sigs.k8s.io/controller-runtime" "github.com/Azure/eno/internal/controllers/reconciliation" @@ -28,6 +29,7 @@ func run() error { rediscoverWhenNotFound bool writeBatchInterval time.Duration discoveryMaxRPS float32 + debugLogging bool mgrOpts = &manager.Options{ Rest: ctrl.GetConfigOrDie(), @@ -35,10 +37,15 @@ func run() error { ) flag.BoolVar(&rediscoverWhenNotFound, "rediscover-when-not-found", true, "Invalidate discovery cache when any type is not found in the openapi spec. Set this to false on <= k8s 1.14") flag.DurationVar(&writeBatchInterval, "write-batch-interval", time.Second*5, "The max throughput of composition status updates") + flag.BoolVar(&debugLogging, "debug", true, "Enable debug logging") mgrOpts.Bind(flag.CommandLine) flag.Parse() - zl, err := zap.NewProduction() + zapCfg := zap.NewProductionConfig() + if debugLogging { + zapCfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) + } + zl, err := zapCfg.Build() if err != nil { return err } diff --git a/dev/deploy.yaml b/dev/deploy.yaml index 91efb476..aae5a399 100644 --- a/dev/deploy.yaml +++ b/dev/deploy.yaml @@ -39,3 +39,6 @@ spec: containers: - name: eno-reconciler image: $REGISTRY/eno-reconciler:$TAG + env: + - name: PPROF_ADDR + value: ":8888" diff --git a/examples/loadtest.yaml b/examples/loadtest.yaml new file mode 100644 index 00000000..dd9dae7e --- /dev/null +++ b/examples/loadtest.yaml @@ -0,0 +1,36 @@ +apiVersion: eno.azure.io/v1 +kind: Synthesizer +metadata: + name: load-test-synth +spec: + image: docker.io/ubuntu:latest + command: + - /bin/bash + - -c + - | + n=600 + echo -n '[' + for i in $(seq $n); do + echo -n "{ + \"apiVersion\": \"v1\", + \"kind\": \"ConfigMap\", + \"metadata\": { \"name\": \"test-cm-${i}\", \"namespace\": \"default\", \"annotations\": { \"eno.azure.io/reconcile-interval\": \"10s\" } }, + \"data\": { \"foo\": \"bar\" } + }" + if [[ $i == $n ]]; then + echo "]" + else + echo "," + fi + done + + +--- + +apiVersion: eno.azure.io/v1 +kind: Composition +metadata: + name: load-test +spec: + synthesizer: + name: load-test-synth diff --git a/go.mod b/go.mod index 0ca899ed..9f449e37 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/chzyer/readline v1.5.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect @@ -35,8 +36,10 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 // indirect github.com/google/uuid v1.3.1 // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index eff2a789..d7b378c4 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ= +github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI= +github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk= +github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -49,11 +53,15 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= +github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 h1:dHLYa5D8/Ta0aLR2XcPsrkpAgGeFs6thhMcQK0oQ0n8= +github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab h1:BA4a7pe6ZTd9F8kXETBoijjFJ/ntaa//1wiH9BZu4zU= +github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -147,6 +155,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= diff --git a/internal/controllers/reconciliation/discoverycache.go b/internal/controllers/reconciliation/discoverycache.go index d4f858ac..ff748bd4 100644 --- a/internal/controllers/reconciliation/discoverycache.go +++ b/internal/controllers/reconciliation/discoverycache.go @@ -5,6 +5,8 @@ import ( "sync" "github.com/go-logr/logr" + openapi_v2 "github.com/google/gnostic-models/openapiv2" + "gopkg.in/yaml.v2" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" @@ -17,10 +19,11 @@ import ( // discoveryCache is useful to prevent excessive QPS to the discovery APIs while // still allowing dynamic refresh of the openapi spec on cache misses. type discoveryCache struct { - mut sync.Mutex - client discovery.DiscoveryInterface - fillWhenNotFound bool - current openapi.Resources + mut sync.Mutex + client discovery.DiscoveryInterface + fillWhenNotFound bool + currentResources openapi.Resources + currentSupportedTypes map[schema.GroupVersionKind]struct{} } func newDicoveryCache(rc *rest.Config, qps float32, fillWhenNotFound bool) (*discoveryCache, error) { @@ -33,7 +36,7 @@ func newDicoveryCache(rc *rest.Config, qps float32, fillWhenNotFound bool) (*dis } disc.UseLegacyDiscovery = true // don't bother with aggregated APIs since they may be unavailable - d := &discoveryCache{client: disc, fillWhenNotFound: fillWhenNotFound} + d := &discoveryCache{client: disc, fillWhenNotFound: fillWhenNotFound, currentSupportedTypes: map[schema.GroupVersionKind]struct{}{}} return d, nil } @@ -45,16 +48,16 @@ func (d *discoveryCache) Get(ctx context.Context, gvk schema.GroupVersionKind) ( // Older versions of Kubernetes don't include CRDs in the openapi spec, so on those versions we cannot invalidate the cache if a resource is not found. // However, on newer versions we expect every resource to exist in the spec so retries are safe and often necessary. for i := 0; i < 2; i++ { - if d.current == nil { + if d.currentResources == nil { logger.V(1).Info("filling discovery cache") if err := d.fillUnlocked(ctx); err != nil { return nil, err } } - model := d.current.LookupResource(gvk) + model := d.currentResources.LookupResource(gvk) if model == nil && d.fillWhenNotFound { - d.current = nil // invalidate cache - retrieve fresh schema on next attempt + d.currentResources = nil // invalidate cache - retrieve fresh schema on next attempt continue } return d.checkSupportUnlocked(ctx, gvk, model) @@ -71,7 +74,8 @@ func (d *discoveryCache) fillUnlocked(ctx context.Context) error { if err != nil { return err } - d.current = resources + d.currentResources = resources + d.currentSupportedTypes = buildSupportedTypesMap(doc) return nil } @@ -82,11 +86,46 @@ func (d *discoveryCache) checkSupportUnlocked(ctx context.Context, gvk schema.Gr return nil, nil } - for _, c := range d.current.GetConsumes(gvk, "PATCH") { - if c == string(types.StrategicMergePatchType) { - return model, nil - } + if _, ok := d.currentSupportedTypes[gvk]; ok { + return model, nil } return nil, nil // doesn't support strategic merge } + +func buildSupportedTypesMap(doc *openapi_v2.Document) map[schema.GroupVersionKind]struct{} { + // This is copied and adapted from the kubectl openapi package + // Originally it walked the entire tree for every lookup, we have optimized it down to a single map lookup. + m := make(map[schema.GroupVersionKind]struct{}) + for _, path := range doc.GetPaths().GetPath() { + for _, ex := range path.GetValue().GetPatch().GetVendorExtension() { + if ex.GetValue().GetYaml() == "" || + ex.GetName() != "x-kubernetes-group-version-kind" { + continue + } + + var value map[string]string + err := yaml.Unmarshal([]byte(ex.GetValue().GetYaml()), &value) + if err != nil { + continue + } + + gvk := schema.GroupVersionKind{ + Group: value["group"], + Version: value["version"], + Kind: value["kind"], + } + var supported bool + for _, c := range path.GetValue().GetPatch().GetConsumes() { + if c == string(types.StrategicMergePatchType) { + supported = true + break + } + } + if supported { + m[gvk] = struct{}{} + } + } + } + return m +} diff --git a/internal/controllers/reconciliation/integration_test.go b/internal/controllers/reconciliation/integration_test.go index 89d4017b..c30a7c63 100644 --- a/internal/controllers/reconciliation/integration_test.go +++ b/internal/controllers/reconciliation/integration_test.go @@ -294,3 +294,81 @@ func getPhase(obj client.Object) string { } return anno["test-phase"] } + +func TestReconcileInterval(t *testing.T) { + scheme := runtime.NewScheme() + corev1.SchemeBuilder.AddToScheme(scheme) + testv1.SchemeBuilder.AddToScheme(scheme) + + ctx := testutil.NewContext(t) + mgr := testutil.NewManager(t) + upstream := mgr.GetClient() + downstream := mgr.DownstreamClient + + // Register supporting controllers + rm, err := reconstitution.New(mgr.Manager, time.Millisecond) + require.NoError(t, err) + require.NoError(t, synthesis.NewRolloutController(mgr.Manager, time.Millisecond)) + require.NoError(t, synthesis.NewStatusController(mgr.Manager)) + require.NoError(t, synthesis.NewPodLifecycleController(mgr.Manager, &synthesis.Config{ + Timeout: time.Second * 5, + })) + require.NoError(t, synthesis.NewExecController(mgr.Manager, time.Second, &testutil.ExecConn{Hook: func(s *apiv1.Synthesizer) []client.Object { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-obj", + Namespace: "default", + Annotations: map[string]string{ + "eno.azure.io/reconcile-interval": "250ms", + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{ + Name: "first", + Port: 1234, + Protocol: corev1.ProtocolTCP, + }}, + }, + } + + gvks, _, err := scheme.ObjectKinds(obj) + require.NoError(t, err) + obj.GetObjectKind().SetGroupVersionKind(gvks[0]) + return []client.Object{obj} + }})) + + // Test subject + err = New(rm, mgr.DownstreamRestConfig, 5, testutil.AtLeastVersion(t, 15)) + require.NoError(t, err) + mgr.Start(t) + + // Any syn/comp will do since we faked out the synthesizer pod + syn := &apiv1.Synthesizer{} + syn.Name = "test-syn" + syn.Spec.Image = "create" + require.NoError(t, upstream.Create(ctx, syn)) + + comp := &apiv1.Composition{} + comp.Name = "test-comp" + comp.Namespace = "default" + comp.Spec.Synthesizer.Name = syn.Name + require.NoError(t, upstream.Create(ctx, comp)) + + // Wait for service to be created + obj := &corev1.Service{} + testutil.Eventually(t, func() bool { + obj.SetName("test-obj") + obj.SetNamespace("default") + err = downstream.Get(context.Background(), client.ObjectKeyFromObject(obj), obj) + return err == nil + }) + + // Update the service from outside of Eno + obj.Spec.Ports[0].Port = 2345 + + // The service should eventually converge with the desired state + testutil.Eventually(t, func() bool { + err = downstream.Get(context.Background(), client.ObjectKeyFromObject(obj), obj) + return err == nil && obj.Spec.Ports[0].Port == 1234 + }) +} diff --git a/internal/controllers/synthesis/exec.go b/internal/controllers/synthesis/exec.go index fd47db13..39c486aa 100644 --- a/internal/controllers/synthesis/exec.go +++ b/internal/controllers/synthesis/exec.go @@ -216,13 +216,14 @@ func buildResourceSlices(comp *apiv1.Composition, previous []*apiv1.ResourceSlic refs := map[resourceRef]struct{}{} manifests := []apiv1.Manifest{} for i, output := range outputs { + reconcileInterval := consumeReconcileIntervalAnnotation(output) js, err := output.MarshalJSON() if err != nil { return nil, reconcile.TerminalError(fmt.Errorf("encoding output %d: %w", i, err)) } manifests = append(manifests, apiv1.Manifest{ Manifest: string(js), - ReconcileInterval: consumeReconcileIntervalAnnotation(output), + ReconcileInterval: reconcileInterval, }) refs[newResourceRef(output)] = struct{}{} } diff --git a/internal/controllers/synthesis/exec_test.go b/internal/controllers/synthesis/exec_test.go index 57e9abd7..8fe29615 100644 --- a/internal/controllers/synthesis/exec_test.go +++ b/internal/controllers/synthesis/exec_test.go @@ -2,6 +2,7 @@ package synthesis import ( "testing" + "time" apiv1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/testutil" @@ -62,6 +63,31 @@ func TestBuildResourceSlicesTombstonesBasics(t *testing.T) { require.Len(t, slices, 0) } +func TestBuildResourceSlicesReconcileInterval(t *testing.T) { + outputs := []*unstructured.Unstructured{{ + Object: map[string]interface{}{ + "kind": "Test", + "apiVersion": "mygroup/v1", + "metadata": map[string]interface{}{ + "name": "test-resource", + "namespace": "test-ns", + "annotations": map[string]interface{}{ + "eno.azure.io/reconcile-interval": "10s", + }, + }, + }, + }} + + // The reconcile interval is passed from the resource itself to its manifest representation + slices, err := buildResourceSlices(&apiv1.Composition{}, []*apiv1.ResourceSlice{}, outputs, 100000) + require.NoError(t, err) + require.Len(t, slices, 1) + require.Len(t, slices[0].Spec.Resources, 1) + require.NotNil(t, slices[0].Spec.Resources[0].ReconcileInterval) + assert.Equal(t, time.Second*10, slices[0].Spec.Resources[0].ReconcileInterval.Duration) // it's in the manifest + assert.Equal(t, "{\"apiVersion\":\"mygroup/v1\",\"kind\":\"Test\",\"metadata\":{\"annotations\":{},\"name\":\"test-resource\",\"namespace\":\"test-ns\"}}\n", slices[0].Spec.Resources[0].Manifest) // it's not in the resource itself +} + func TestBuildResourceSlicesTombstonesVersionSemantics(t *testing.T) { outputs := []*unstructured.Unstructured{{ Object: map[string]interface{}{ diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 972264b2..acee45cd 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -4,6 +4,10 @@ import ( "context" "flag" "fmt" + "os" + + "net/http" + _ "net/http/pprof" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -34,6 +38,15 @@ const ( ManagerLabelValue = "eno" ) +func init() { + go func() { + if addr := os.Getenv("PPROF_ADDR"); addr != "" { + err := http.ListenAndServe(addr, nil) + panic(fmt.Sprintf("unable to serve pprof listener: %s", err)) + } + }() +} + type Options struct { Rest *rest.Config Namespace string diff --git a/internal/reconstitution/reconstituter.go b/internal/reconstitution/reconstituter.go index a03f59b7..7fe05404 100644 --- a/internal/reconstitution/reconstituter.go +++ b/internal/reconstitution/reconstituter.go @@ -61,32 +61,37 @@ func (r *reconstituter) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R ctx = logr.NewContext(ctx, logger) // We populate the cache with both the previous and current syntheses - err = r.populateCache(ctx, comp, comp.Status.PreviousState) + prevReqs, err := r.populateCache(ctx, comp, comp.Status.PreviousState) if err != nil { return ctrl.Result{}, fmt.Errorf("processing previous state: %w", err) } - err = r.populateCache(ctx, comp, comp.Status.CurrentState) + currentReqs, err := r.populateCache(ctx, comp, comp.Status.CurrentState) if err != nil { return ctrl.Result{}, fmt.Errorf("processing current state: %w", err) } + for _, req := range append(prevReqs, currentReqs...) { + for _, queue := range r.queues { + queue.Add(req) + } + } r.cache.Purge(ctx, req.NamespacedName, comp) return ctrl.Result{}, nil } -func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Composition, synthesis *apiv1.Synthesis) error { +func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Composition, synthesis *apiv1.Synthesis) ([]*Request, error) { logger := logr.FromContextOrDiscard(ctx) if synthesis == nil || !synthesis.Synthesized { // synthesis is still in progress - return nil + return nil, nil } compNSN := types.NamespacedName{Namespace: comp.Namespace, Name: comp.Name} logger = logger.WithValues("synthesisCompositionGeneration", synthesis.ObservedCompositionGeneration) ctx = logr.NewContext(ctx, logger) if r.cache.HasSynthesis(ctx, compNSN, synthesis) { - return nil + return nil, nil } slices := make([]apiv1.ResourceSlice, len(synthesis.ResourceSlices)) @@ -96,20 +101,10 @@ func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Compositi slice.Namespace = comp.Namespace err := r.client.Get(ctx, client.ObjectKeyFromObject(&slice), &slice) if err != nil { - return fmt.Errorf("unable to get resource slice: %w", err) + return nil, fmt.Errorf("unable to get resource slice: %w", err) } slices[i] = slice } - reqs, err := r.cache.Fill(ctx, compNSN, synthesis, slices) - if err != nil { - return err - } - for _, req := range reqs { - for _, queue := range r.queues { - queue.Add(req) - } - } - - return nil + return r.cache.Fill(ctx, compNSN, synthesis, slices) }