Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
jveski committed Dec 1, 2023
1 parent 491b1e5 commit 09af28f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cmd/eno-reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func run() error {
if err != nil {
return err
}
if _, err := reconciliation.New(reMgr, nil, 1, true); err != nil { // TODO
if err := reconciliation.New(reMgr, nil, 1, true); err != nil { // TODO
return err
}

Expand Down
56 changes: 24 additions & 32 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -29,26 +31,25 @@ type Controller struct {
discovery *discoveryCache
}

func New(mgr *reconstitution.Manager, downstream *rest.Config, discoveryRPS float32, rediscoverWhenNotFound bool) (*Controller, error) { // TODO: REmove return
func New(mgr *reconstitution.Manager, downstream *rest.Config, discoveryRPS float32, rediscoverWhenNotFound bool) error {
upstreamClient, err := client.New(downstream, client.Options{
Scheme: runtime.NewScheme(), // empty scheme since we shouldn't rely on compile-time types
})
if err != nil {
return nil, err
return err
}

disc, err := newDicoveryCache(downstream, discoveryRPS, rediscoverWhenNotFound)
if err != nil {
return nil, err
return err
}

c := &Controller{
return mgr.Add(&Controller{
client: mgr.Manager.GetClient(),
resourceClient: mgr.GetClient(),
upstreamClient: upstreamClient,
discovery: disc,
}
return c, mgr.Add(c)
})
}

func (c *Controller) Name() string { return "reconciliationController" }
Expand All @@ -62,15 +63,15 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
}

if comp.Status.CurrentState == nil {
logger.V(1).Info("composition has not yet been synthesized")
// we don't log here because it would be too noisy
return ctrl.Result{}, nil
}
currentGen := comp.Status.CurrentState.ObservedCompositionGeneration

// Find the current and (optionally) previous desired states in the cache
currentGen := comp.Status.CurrentState.ObservedCompositionGeneration
resource, found := c.resourceClient.Get(ctx, &req.ResourceRef, currentGen)
if !found {
logger.V(0).Info("resource not found - dropping")
logger.V(1).Info("resource not found - waiting for cache to be filled")
return ctrl.Result{}, nil
}

Expand All @@ -79,7 +80,7 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
var ok bool
prev, ok = c.resourceClient.Get(ctx, &req.ResourceRef, comp.Status.PreviousState.ObservedCompositionGeneration)
if !ok {
logger.V(0).Info("previous resource not found - dropping") // TODO: error?
logger.V(1).Info("previous resource not found - waiting for cache to be filled")
return ctrl.Result{}, nil
}
} else {
Expand Down Expand Up @@ -133,20 +134,22 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco
// return nil
// }

// Create
// Always create the resource when it doesn't exist
if current.GetResourceVersion() == "" {
logger.V(0).Info("creating resource")
err := c.upstreamClient.Create(ctx, resource.Object)
if err != nil {
return fmt.Errorf("creating resource: %w", err)
}
logger.V(0).Info("created resource")
return nil
}

// Patch
// Replace the entire resource when a previous state isn't provided since we can't compute a merge patch without it
if prev == nil {
return errors.New("TODO do a put instead?")
return errors.New("TODO IMPLEMENT ME")
}

// Compute a merge patch
patch, patchType, err := c.buildPatch(ctx, prev, resource, current)
if err != nil {
return fmt.Errorf("building patch: %w", err)
Expand All @@ -155,35 +158,26 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco
logger.V(1).Info("skipping empty patch")
return nil
}

logger.V(0).Info("patching resource", "patch", string(patch), "patchType", string(patchType), "currentResourceVersion", current.GetResourceVersion())
err = c.upstreamClient.Patch(ctx, current, client.RawPatch(patchType, patch))
if err != nil {
return fmt.Errorf("applying patch: %w", err)
}
logger.V(0).Info("patched resource", "patch", string(patch), "patchType", string(patchType), "resourceVersion", current.GetResourceVersion())

return nil
}

func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitution.Resource, current *unstructured.Unstructured) ([]byte, types.PatchType, error) {
// We need to remove the creation timestamp since the other versions of the resource we're merging against won't have it.
// It's safe to mutate in this case because resource has already been copied by the cache.
current.SetCreationTimestamp(metav1.NewTime(time.Time{}))

currentJS, err := current.MarshalJSON()
if err != nil {
return nil, "", fmt.Errorf("building json representation of desired state: %w", err)
}

prev.Object.SetResourceVersion(current.GetResourceVersion())
prevJS, err := prev.Object.MarshalJSON()
if err != nil {
panic(err) // TODO
}

resource.Object.SetResourceVersion(current.GetResourceVersion())
resourceJS, err := resource.Object.MarshalJSON()
if err != nil {
panic(err) // TODO
}

model, err := c.discovery.Get(ctx, current.GroupVersionKind()) // TODO: Change back?
model, err := c.discovery.Get(ctx, resource.Object.GroupVersionKind())
if err != nil {
return nil, "", fmt.Errorf("getting merge metadata: %w", err)
}
Expand All @@ -192,9 +186,7 @@ func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitut
return patch, types.MergePatchType, err
}

println("TODO PATCH META", string(prevJS), string(resourceJS), string(currentJS))

patchmeta := strategicpatch.NewPatchMetaFromOpenAPI(model)
patch, err := strategicpatch.CreateThreeWayMergePatch([]byte(prevJS), []byte(resourceJS), currentJS, patchmeta, true)
patch, err := strategicpatch.CreateThreeWayMergePatch([]byte(prev.Manifest), []byte(resource.Manifest), currentJS, patchmeta, true)
return patch, types.StrategicMergePatchType, err
}
24 changes: 10 additions & 14 deletions internal/controllers/reconciliation/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ func TestControllerBasics(t *testing.T) {
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Name: "first",
Port: 1234,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(1234), // TODO: Shouldn't be necessary
Name: "first",
Port: 1234,
Protocol: corev1.ProtocolTCP,
}},
},
},
Expand All @@ -59,10 +58,9 @@ func TestControllerBasics(t *testing.T) {
ApplyExternalUpdate: func(t *testing.T, obj client.Object) client.Object {
svc := obj.(*corev1.Service).DeepCopy()
svc.Spec.Ports = []corev1.ServicePort{{
Name: "second",
Port: 2345,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(2345),
Name: "second",
Port: 2345,
Protocol: corev1.ProtocolTCP,
}}
return svc
},
Expand All @@ -73,10 +71,9 @@ func TestControllerBasics(t *testing.T) {
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Name: "third",
Port: 3456,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(3456),
Name: "third",
Port: 3456,
Protocol: corev1.ProtocolTCP,
}},
},
},
Expand Down Expand Up @@ -142,7 +139,6 @@ func TestControllerBasics(t *testing.T) {

js, err := json.Marshal(obj)
require.NoError(t, err)
t.Logf("resource json %s", js)

slice := &apiv1.ResourceSlice{}
slice.GenerateName = "test-"
Expand All @@ -154,7 +150,7 @@ func TestControllerBasics(t *testing.T) {

// Test subject
// Only enable rediscoverWhenNotFound on k8s versions that can support it.
_, err = New(rm, mgr.DownstreamRestConfig, 5, testutil.AtLeastVersion(t, 15))
err = New(rm, mgr.DownstreamRestConfig, 5, testutil.AtLeastVersion(t, 15))
require.NoError(t, err)
mgr.Start(t)

Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/synthesis/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *statusController) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if err := c.client.Status().Update(ctx, comp); err != nil {
return ctrl.Result{}, fmt.Errorf("updating composition status: %w", err)
}
logger.Info("populated synthesis status to reflect pod")
logger.Info("added synthesizer pod status to its composition resource")
return ctrl.Result{}, nil
}

Expand Down
1 change: 0 additions & 1 deletion internal/reconstitution/reconstituter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Compositi
logger = logger.WithValues("synthesisGen", synthesis.ObservedCompositionGeneration)
ctx = logr.NewContext(ctx, logger)
if r.cache.HasSynthesis(ctx, compNSN, synthesis) {
logger.V(1).Info("this synthesis has already been cached")
return nil
}

Expand Down

0 comments on commit 09af28f

Please sign in to comment.