From 09af28fcea18fd5497b25ec75347ace9bde1ffa4 Mon Sep 17 00:00:00 2001 From: Jordan Olshevski Date: Fri, 1 Dec 2023 17:02:12 -0600 Subject: [PATCH] refactoring --- cmd/eno-reconciler/main.go | 2 +- .../controllers/reconciliation/controller.go | 56 ++++++++----------- .../reconciliation/controller_test.go | 24 ++++---- internal/controllers/synthesis/status.go | 2 +- internal/reconstitution/reconstituter.go | 1 - 5 files changed, 36 insertions(+), 49 deletions(-) diff --git a/cmd/eno-reconciler/main.go b/cmd/eno-reconciler/main.go index c8ab887c..89d8ad25 100644 --- a/cmd/eno-reconciler/main.go +++ b/cmd/eno-reconciler/main.go @@ -79,7 +79,7 @@ func run() error { if err != nil { return err } - if _, err := reconciliation.New(reMgr, nil, 1, true); err != nil { // TODO + if err := reconciliation.New(reMgr, nil, 1, true); err != nil { // TODO return err } diff --git a/internal/controllers/reconciliation/controller.go b/internal/controllers/reconciliation/controller.go index e9dfe4de..68126368 100644 --- a/internal/controllers/reconciliation/controller.go +++ b/internal/controllers/reconciliation/controller.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -29,26 +31,25 @@ type Controller struct { discovery *discoveryCache } -func New(mgr *reconstitution.Manager, downstream *rest.Config, discoveryRPS float32, rediscoverWhenNotFound bool) (*Controller, error) { // TODO: REmove return +func New(mgr *reconstitution.Manager, downstream *rest.Config, discoveryRPS float32, rediscoverWhenNotFound bool) error { upstreamClient, err := client.New(downstream, client.Options{ Scheme: runtime.NewScheme(), // empty scheme since we shouldn't rely on compile-time types }) if err != nil { - return nil, err + return err } disc, err := newDicoveryCache(downstream, discoveryRPS, rediscoverWhenNotFound) if err != nil { - return nil, err + return err } - c := &Controller{ + return mgr.Add(&Controller{ client: mgr.Manager.GetClient(), resourceClient: mgr.GetClient(), upstreamClient: upstreamClient, discovery: disc, - } - return c, mgr.Add(c) + }) } func (c *Controller) Name() string { return "reconciliationController" } @@ -62,15 +63,15 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request) } if comp.Status.CurrentState == nil { - logger.V(1).Info("composition has not yet been synthesized") + // we don't log here because it would be too noisy return ctrl.Result{}, nil } - currentGen := comp.Status.CurrentState.ObservedCompositionGeneration // Find the current and (optionally) previous desired states in the cache + currentGen := comp.Status.CurrentState.ObservedCompositionGeneration resource, found := c.resourceClient.Get(ctx, &req.ResourceRef, currentGen) if !found { - logger.V(0).Info("resource not found - dropping") + logger.V(1).Info("resource not found - waiting for cache to be filled") return ctrl.Result{}, nil } @@ -79,7 +80,7 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request) var ok bool prev, ok = c.resourceClient.Get(ctx, &req.ResourceRef, comp.Status.PreviousState.ObservedCompositionGeneration) if !ok { - logger.V(0).Info("previous resource not found - dropping") // TODO: error? + logger.V(1).Info("previous resource not found - waiting for cache to be filled") return ctrl.Result{}, nil } } else { @@ -133,20 +134,22 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco // return nil // } - // Create + // Always create the resource when it doesn't exist if current.GetResourceVersion() == "" { - logger.V(0).Info("creating resource") err := c.upstreamClient.Create(ctx, resource.Object) if err != nil { return fmt.Errorf("creating resource: %w", err) } + logger.V(0).Info("created resource") return nil } - // Patch + // Replace the entire resource when a previous state isn't provided since we can't compute a merge patch without it if prev == nil { - return errors.New("TODO do a put instead?") + return errors.New("TODO IMPLEMENT ME") } + + // Compute a merge patch patch, patchType, err := c.buildPatch(ctx, prev, resource, current) if err != nil { return fmt.Errorf("building patch: %w", err) @@ -155,35 +158,26 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco logger.V(1).Info("skipping empty patch") return nil } - - logger.V(0).Info("patching resource", "patch", string(patch), "patchType", string(patchType), "currentResourceVersion", current.GetResourceVersion()) err = c.upstreamClient.Patch(ctx, current, client.RawPatch(patchType, patch)) if err != nil { return fmt.Errorf("applying patch: %w", err) } + logger.V(0).Info("patched resource", "patch", string(patch), "patchType", string(patchType), "resourceVersion", current.GetResourceVersion()) return nil } func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitution.Resource, current *unstructured.Unstructured) ([]byte, types.PatchType, error) { + // We need to remove the creation timestamp since the other versions of the resource we're merging against won't have it. + // It's safe to mutate in this case because resource has already been copied by the cache. + current.SetCreationTimestamp(metav1.NewTime(time.Time{})) + currentJS, err := current.MarshalJSON() if err != nil { return nil, "", fmt.Errorf("building json representation of desired state: %w", err) } - prev.Object.SetResourceVersion(current.GetResourceVersion()) - prevJS, err := prev.Object.MarshalJSON() - if err != nil { - panic(err) // TODO - } - - resource.Object.SetResourceVersion(current.GetResourceVersion()) - resourceJS, err := resource.Object.MarshalJSON() - if err != nil { - panic(err) // TODO - } - - model, err := c.discovery.Get(ctx, current.GroupVersionKind()) // TODO: Change back? + model, err := c.discovery.Get(ctx, resource.Object.GroupVersionKind()) if err != nil { return nil, "", fmt.Errorf("getting merge metadata: %w", err) } @@ -192,9 +186,7 @@ func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitut return patch, types.MergePatchType, err } - println("TODO PATCH META", string(prevJS), string(resourceJS), string(currentJS)) - patchmeta := strategicpatch.NewPatchMetaFromOpenAPI(model) - patch, err := strategicpatch.CreateThreeWayMergePatch([]byte(prevJS), []byte(resourceJS), currentJS, patchmeta, true) + patch, err := strategicpatch.CreateThreeWayMergePatch([]byte(prev.Manifest), []byte(resource.Manifest), currentJS, patchmeta, true) return patch, types.StrategicMergePatchType, err } diff --git a/internal/controllers/reconciliation/controller_test.go b/internal/controllers/reconciliation/controller_test.go index 0bc2bce8..eb3f5e25 100644 --- a/internal/controllers/reconciliation/controller_test.go +++ b/internal/controllers/reconciliation/controller_test.go @@ -40,10 +40,9 @@ func TestControllerBasics(t *testing.T) { }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{{ - Name: "first", - Port: 1234, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(1234), // TODO: Shouldn't be necessary + Name: "first", + Port: 1234, + Protocol: corev1.ProtocolTCP, }}, }, }, @@ -59,10 +58,9 @@ func TestControllerBasics(t *testing.T) { ApplyExternalUpdate: func(t *testing.T, obj client.Object) client.Object { svc := obj.(*corev1.Service).DeepCopy() svc.Spec.Ports = []corev1.ServicePort{{ - Name: "second", - Port: 2345, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(2345), + Name: "second", + Port: 2345, + Protocol: corev1.ProtocolTCP, }} return svc }, @@ -73,10 +71,9 @@ func TestControllerBasics(t *testing.T) { }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{{ - Name: "third", - Port: 3456, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(3456), + Name: "third", + Port: 3456, + Protocol: corev1.ProtocolTCP, }}, }, }, @@ -142,7 +139,6 @@ func TestControllerBasics(t *testing.T) { js, err := json.Marshal(obj) require.NoError(t, err) - t.Logf("resource json %s", js) slice := &apiv1.ResourceSlice{} slice.GenerateName = "test-" @@ -154,7 +150,7 @@ func TestControllerBasics(t *testing.T) { // Test subject // Only enable rediscoverWhenNotFound on k8s versions that can support it. - _, err = New(rm, mgr.DownstreamRestConfig, 5, testutil.AtLeastVersion(t, 15)) + err = New(rm, mgr.DownstreamRestConfig, 5, testutil.AtLeastVersion(t, 15)) require.NoError(t, err) mgr.Start(t) diff --git a/internal/controllers/synthesis/status.go b/internal/controllers/synthesis/status.go index d58c3f99..f6a74ca5 100644 --- a/internal/controllers/synthesis/status.go +++ b/internal/controllers/synthesis/status.go @@ -78,7 +78,7 @@ func (c *statusController) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err := c.client.Status().Update(ctx, comp); err != nil { return ctrl.Result{}, fmt.Errorf("updating composition status: %w", err) } - logger.Info("populated synthesis status to reflect pod") + logger.Info("added synthesizer pod status to its composition resource") return ctrl.Result{}, nil } diff --git a/internal/reconstitution/reconstituter.go b/internal/reconstitution/reconstituter.go index 052ae110..bb5b8eb1 100644 --- a/internal/reconstitution/reconstituter.go +++ b/internal/reconstitution/reconstituter.go @@ -87,7 +87,6 @@ func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Compositi logger = logger.WithValues("synthesisGen", synthesis.ObservedCompositionGeneration) ctx = logr.NewContext(ctx, logger) if r.cache.HasSynthesis(ctx, compNSN, synthesis) { - logger.V(1).Info("this synthesis has already been cached") return nil }