Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Housekeeping, logging improvements, race condition fixes #21

Merged
merged 22 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package reconciliation

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -55,17 +56,17 @@ func New(mgr *reconstitution.Manager, downstream *rest.Config, discoveryRPS floa
func (c *Controller) Name() string { return "reconciliationController" }

func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx)
comp := &apiv1.Composition{}
err := c.client.Get(ctx, req.Composition, comp)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting composition: %w", err)
}

if comp.Status.CurrentState == nil {
// we don't log here because it would be too noisy
return ctrl.Result{}, nil
return ctrl.Result{}, nil // nothing to do
}
logger := logr.FromContextOrDiscard(ctx).WithValues("synthesizerName", comp.Spec.Synthesizer.Name, "synthesizerGeneration", comp.Status.CurrentState.ObservedSynthesizerGeneration)
ctx = logr.NewContext(ctx, logger)

// Find the current and (optionally) previous desired states in the cache
currentGen := comp.Status.CurrentState.ObservedCompositionGeneration
Expand Down Expand Up @@ -103,7 +104,6 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
if err := c.reconcileResource(ctx, prev, resource, current); err != nil {
return ctrl.Result{}, err
}
logger.V(1).Info("sync'd resource")

c.resourceClient.PatchStatusAsync(ctx, &req.Manifest, func(rs *apiv1.ResourceState) bool {
if rs.Reconciled {
Expand Down Expand Up @@ -135,6 +135,7 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco
}

// Compute a merge patch
prevRV := current.GetResourceVersion()
patch, patchType, err := c.buildPatch(ctx, prev, resource, current)
if err != nil {
return fmt.Errorf("building patch: %w", err)
Expand All @@ -143,20 +144,20 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco
logger.V(1).Info("skipping empty patch")
return nil
}
patch, err = mungePatch(patch, current.GetResourceVersion())
if err != nil {
return fmt.Errorf("adding resource version: %w", err)
}
err = c.upstreamClient.Patch(ctx, current, client.RawPatch(patchType, patch))
if err != nil {
return fmt.Errorf("applying patch: %w", err)
}
logger.V(0).Info("patched resource", "patchType", string(patchType), "resourceVersion", current.GetResourceVersion())
logger.V(0).Info("patched resource", "patchType", string(patchType), "resourceVersion", current.GetResourceVersion(), "previousResourceVersion", prevRV)

return nil
}

func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitution.Resource, current *unstructured.Unstructured) ([]byte, types.PatchType, error) {
// We need to remove the creation timestamp since the other versions of the resource we're merging against won't have it.
// It's safe to mutate in this case because resource has already been copied by the cache.
current.SetCreationTimestamp(metav1.NewTime(time.Time{}))

var prevManifest []byte
if prev != nil {
prevManifest = []byte(prev.Manifest)
Expand All @@ -180,3 +181,20 @@ func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitut
patch, err := strategicpatch.CreateThreeWayMergePatch(prevManifest, []byte(resource.Manifest), currentJS, patchmeta, true)
return patch, types.StrategicMergePatchType, err
}

func mungePatch(patch []byte, rv string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patch, &patchMap)
if err != nil {
return nil, err
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, err
}
a.SetResourceVersion(rv)
a.SetCreationTimestamp(metav1.Time{})

return json.Marshal(patchMap)
}
2 changes: 0 additions & 2 deletions internal/controllers/reconciliation/discoverycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ func (d *discoveryCache) checkSupportUnlocked(ctx context.Context, gvk schema.Gr

for _, c := range d.current.GetConsumes(gvk, "PATCH") {
if c == string(types.StrategicMergePatchType) {
logger.V(1).Info("using strategic merge")
return model, nil
}
}

logger.V(1).Info("not using strategic merge because it is not supported by the resource")
return nil, nil // doesn't support strategic merge
}
25 changes: 11 additions & 14 deletions internal/controllers/reconciliation/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestCRUD(t *testing.T) {
obj, err := test.Get(downstream)
require.NoError(t, err)

updatedObj := test.ApplyExternalUpdate(t, obj)
updatedObj := test.ApplyExternalUpdate(t, obj.DeepCopyObject().(client.Object))
updatedObj = setPhase(updatedObj, "external-update")
if err := downstream.Update(ctx, updatedObj); err != nil {
return err
Expand All @@ -214,9 +214,18 @@ func TestCRUD(t *testing.T) {
}

func (c *crudTestCase) WaitForPhase(t *testing.T, downstream client.Client, phase string) {
var lastRV string
testutil.Eventually(t, func() bool {
obj, err := c.Get(downstream)
return err == nil && getPhase(obj) == phase
currentPhase := getPhase(obj)
currentRV := obj.GetResourceVersion()
if lastRV == "" {
t.Logf("initial resource version %s was observed in phase %s", currentRV, currentPhase)
} else if currentRV != lastRV {
t.Logf("resource version transitioned from %s to %s in phase %s", lastRV, currentRV, currentPhase)
}
lastRV = currentRV
return err == nil && currentPhase == phase
})
}

Expand All @@ -236,18 +245,6 @@ func setImage(t *testing.T, upstream client.Client, syn *apiv1.Synthesizer, comp
return upstream.Update(context.Background(), syn)
})
require.NoError(t, err)

// Also pin the composition to >= this synthesizer version.
// This is necessary to avoid deadlock in cases where incoherent cache causes the composition not to be updated on this tick of the rollout loop.
// It isn't a problem in production because we don't expect serialized behavior from the rollout controller.
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := upstream.Get(context.Background(), client.ObjectKeyFromObject(comp), comp); err != nil {
return err
}
comp.Spec.Synthesizer.MinGeneration = syn.Generation
return upstream.Update(context.Background(), comp)
})
require.NoError(t, err)
}

func newSliceBuilder(t *testing.T, scheme *runtime.Scheme, test *crudTestCase) func(c *apiv1.Composition, s *apiv1.Synthesizer) []*apiv1.ResourceSlice {
Expand Down
18 changes: 8 additions & 10 deletions internal/controllers/synthesis/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
var minimalTestConfig = &Config{
WrapperImage: "test-wrapper-image",
MaxRestarts: 3,
Timeout: time.Second * 2,
Timeout: time.Second * 6,
}

func TestControllerHappyPath(t *testing.T) {
Expand Down Expand Up @@ -107,12 +107,11 @@ func TestControllerHappyPath(t *testing.T) {
})

// The pod eventually completes and is deleted
// TODO: Why does this fail?
// testutil.Eventually(t, func() bool {
// list := &corev1.PodList{}
// require.NoError(t, cli.List(ctx, list))
// return len(list.Items) == 0
// })
testutil.Eventually(t, func() bool {
list := &corev1.PodList{}
require.NoError(t, cli.List(ctx, list))
return len(list.Items) == 0
})
}

func TestControllerFastCompositionUpdates(t *testing.T) {
Expand Down Expand Up @@ -254,12 +253,12 @@ func TestControllerSwitchingSynthesizers(t *testing.T) {

syn1 := &apiv1.Synthesizer{}
syn1.Name = "test-syn-1"
syn1.Spec.Image = "test-syn-image"
syn1.Spec.Image = "initial-image"
require.NoError(t, cli.Create(ctx, syn1))

syn2 := &apiv1.Synthesizer{}
syn2.Name = "test-syn-2"
syn2.Spec.Image = "initial-image"
syn2.Spec.Image = "updated-image"
require.NoError(t, cli.Create(ctx, syn2))

comp := &apiv1.Composition{}
Expand All @@ -285,7 +284,6 @@ func TestControllerSwitchingSynthesizers(t *testing.T) {
})
require.NoError(t, err)

// TODO: This test is a tiny bit flaky
testutil.Eventually(t, func() bool {
require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp))
return comp.Status.CurrentState != nil && len(comp.Status.CurrentState.ResourceSlices) == 2
Expand Down
16 changes: 8 additions & 8 deletions internal/controllers/synthesis/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if comp.Spec.Synthesizer.Name == "" {
return ctrl.Result{}, nil
}
logger = logger.WithValues("composition", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation)
logger = logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation)

syn := &apiv1.Synthesizer{}
syn.Name = comp.Spec.Synthesizer.Name
err = c.client.Get(ctx, client.ObjectKeyFromObject(syn), syn)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting synthesizer: %w", err)
}
logger = logger.WithValues("synthesizer", syn.Name, "synthesizerGeneration", syn.Generation)
logger = logger.WithValues("synthesizerName", syn.Name, "synthesizerGeneration", syn.Generation)

// Delete any unnecessary pods
pods := &corev1.PodList{}
Expand All @@ -78,7 +78,7 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if err := c.client.Delete(ctx, toDelete); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting pod: %w", err)
}
logger.Info("deleted pod", "podName", toDelete.Name)
logger.Info("deleted synthesizer pod", "podName", toDelete.Name)
return ctrl.Result{}, nil
}

Expand All @@ -88,12 +88,12 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if err := c.client.Status().Update(ctx, comp); err != nil {
return ctrl.Result{}, fmt.Errorf("swapping compisition state: %w", err)
}
logger.Info("swapped composition state because composition was modified since last synthesis")
logger.Info("start to synthesize")
return ctrl.Result{}, nil
}

// No need to create a pod if everything is in sync
if comp.Status.CurrentState != nil && comp.Status.CurrentState.ResourceSlices != nil {
if comp.Status.CurrentState != nil && comp.Status.CurrentState.Synthesized {
return ctrl.Result{}, nil
}

Expand All @@ -103,7 +103,7 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if err != nil {
return ctrl.Result{}, client.IgnoreAlreadyExists(fmt.Errorf("creating pod: %w", err))
}
logger.Info("created pod", "podName", pod.Name)
logger.Info("created synthesizer pod", "podName", pod.Name)

return ctrl.Result{}, nil
}
Expand All @@ -117,7 +117,7 @@ func (c *podLifecycleController) shouldDeletePod(logger logr.Logger, comp *apiv1
var activeLatest bool
for _, pod := range pods.Items {
pod := pod
if pod.DeletionTimestamp != nil || !podDerivedFrom(comp, syn, &pod) {
if pod.DeletionTimestamp != nil || !podDerivedFrom(comp, &pod) {
continue
}
if activeLatest {
Expand All @@ -131,7 +131,7 @@ func (c *podLifecycleController) shouldDeletePod(logger logr.Logger, comp *apiv1
for _, pod := range pods.Items {
pod := pod
reason, shouldDelete := c.podStatusTerminal(&pod)
isCurrent := podDerivedFrom(comp, syn, &pod)
isCurrent := podDerivedFrom(comp, &pod)

// If the current pod is being deleted it's safe to create a new one if needed
// Avoid getting stuck by pods that fail to delete
Expand Down
10 changes: 3 additions & 7 deletions internal/controllers/synthesis/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,11 @@ func newPod(cfg *Config, scheme *runtime.Scheme, comp *apiv1.Composition, syn *a
return pod
}

func podDerivedFrom(comp *apiv1.Composition, syn *apiv1.Synthesizer, pod *corev1.Pod) bool {
func podDerivedFrom(comp *apiv1.Composition, pod *corev1.Pod) bool {
if pod.Annotations == nil {
return false
}

var (
compGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/composition-generation"], 10, 0)
synGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/synthesizer-generation"], 10, 0)
)

return compGen == comp.Generation && synGen == syn.Generation
compGen, _ := strconv.ParseInt(pod.Annotations["eno.azure.io/composition-generation"], 10, 0)
return compGen == comp.Generation
}
23 changes: 16 additions & 7 deletions internal/controllers/synthesis/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewRolloutController(mgr ctrl.Manager, cooldownPeriod time.Duration) error
}
return ctrl.NewControllerManagedBy(mgr).
For(&apiv1.Synthesizer{}).
Watches(&apiv1.Composition{}, manager.NewCompositionToSynthesizerHandler(c.client)).
WithLogConstructor(manager.NewLogConstructor(mgr, "rolloutController")).
Complete(c)
}
Expand All @@ -45,11 +46,10 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
logger = logger.WithValues("synthesizerName", syn.Name, "synthesizerNamespace", syn.Namespace, "synthesizerGeneration", syn.Generation)

if syn.Status.LastRolloutTime != nil {
if syn.Status.LastRolloutTime != nil && syn.Status.CurrentGeneration != syn.Generation {
remainingCooldown := c.cooldown - time.Since(syn.Status.LastRolloutTime.Time)
if remainingCooldown > 0 {
logger.V(1).Info("waiting to roll out a synthesizer change until the cooldown period has passed", "latency", remainingCooldown.Milliseconds())
return ctrl.Result{RequeueAfter: remainingCooldown}, nil
return ctrl.Result{RequeueAfter: remainingCooldown}, nil // not ready to continue rollout yet
}
}

Expand All @@ -62,6 +62,7 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

// randomize list to avoid always rolling out changes in the same order
// TODO: Consider a more efficient approach here
rand.Shuffle(len(compList.Items), func(i, j int) { compList.Items[i] = compList.Items[j] })

for _, comp := range compList.Items {
Expand All @@ -77,7 +78,6 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
if err := c.client.Status().Update(ctx, syn); err != nil {
return ctrl.Result{}, fmt.Errorf("advancing last rollout time: %w", err)
}
logger.V(1).Info("advanced last rollout time")

err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := c.client.Get(ctx, client.ObjectKeyFromObject(&comp), &comp); err != nil {
Expand All @@ -89,18 +89,27 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
if err != nil {
return ctrl.Result{}, fmt.Errorf("swapping compisition state: %w", err)
}
logger.Info("synthesizing composition because its synthesizer has changed since last synthesis")
logger.Info("advancing synthesizer rollout process")
return ctrl.Result{RequeueAfter: c.cooldown}, nil
}

// Update the status to reflect the completed rollout
if syn.Status.CurrentGeneration != syn.Generation {
previousTime := syn.Status.LastRolloutTime
now := metav1.Now()
syn.Status.LastRolloutTime = &now
syn.Status.CurrentGeneration = syn.Generation
if err := c.client.Status().Update(ctx, syn); err != nil {
return ctrl.Result{}, fmt.Errorf("updating synthesizer's current generation: %w", err)
}
logger.Info("rollout is complete - updated synthesizer's current generation")
return ctrl.Result{}, nil // TODO: Consider leaving this loop open in case new compositions fell through the cracks earlier

if previousTime != nil {
logger = logger.WithValues("latency", now.Sub(previousTime.Time).Milliseconds())
}
if len(compList.Items) > 0 { // log doesn't make sense if the synthesizer wasn't actually rolled out
logger.Info("finished rolling out latest synthesizer version")
}
return ctrl.Result{}, nil
}

return ctrl.Result{}, nil
Expand Down
Loading
Loading