Skip to content

Commit

Permalink
Housekeeping, logging improvements, race condition fixes (#21)
Browse files Browse the repository at this point in the history
- Clean up log messages to make it easier to follow race conditions
- Send resource version in patches to fix race
- Correct bugs in the synthesizer pod lifecycle to fix race
- Remove explicit ordering from reconciliation test by evaluating
rollout status when compositions change

---------

Co-authored-by: Jordan Olshevski <[email protected]>
  • Loading branch information
jveski and Jordan Olshevski authored Dec 14, 2023
1 parent de7a787 commit 2bfb8dc
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 85 deletions.
38 changes: 28 additions & 10 deletions internal/controllers/reconciliation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package reconciliation

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -55,17 +56,17 @@ func New(mgr *reconstitution.Manager, downstream *rest.Config, discoveryRPS floa
func (c *Controller) Name() string { return "reconciliationController" }

func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx)
comp := &apiv1.Composition{}
err := c.client.Get(ctx, req.Composition, comp)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting composition: %w", err)
}

if comp.Status.CurrentState == nil {
// we don't log here because it would be too noisy
return ctrl.Result{}, nil
return ctrl.Result{}, nil // nothing to do
}
logger := logr.FromContextOrDiscard(ctx).WithValues("synthesizerName", comp.Spec.Synthesizer.Name, "synthesizerGeneration", comp.Status.CurrentState.ObservedSynthesizerGeneration)
ctx = logr.NewContext(ctx, logger)

// Find the current and (optionally) previous desired states in the cache
currentGen := comp.Status.CurrentState.ObservedCompositionGeneration
Expand Down Expand Up @@ -103,7 +104,6 @@ func (c *Controller) Reconcile(ctx context.Context, req *reconstitution.Request)
if err := c.reconcileResource(ctx, prev, resource, current); err != nil {
return ctrl.Result{}, err
}
logger.V(1).Info("sync'd resource")

c.resourceClient.PatchStatusAsync(ctx, &req.Manifest, func(rs *apiv1.ResourceState) bool {
if rs.Reconciled {
Expand Down Expand Up @@ -135,6 +135,7 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco
}

// Compute a merge patch
prevRV := current.GetResourceVersion()
patch, patchType, err := c.buildPatch(ctx, prev, resource, current)
if err != nil {
return fmt.Errorf("building patch: %w", err)
Expand All @@ -143,20 +144,20 @@ func (c *Controller) reconcileResource(ctx context.Context, prev, resource *reco
logger.V(1).Info("skipping empty patch")
return nil
}
patch, err = mungePatch(patch, current.GetResourceVersion())
if err != nil {
return fmt.Errorf("adding resource version: %w", err)
}
err = c.upstreamClient.Patch(ctx, current, client.RawPatch(patchType, patch))
if err != nil {
return fmt.Errorf("applying patch: %w", err)
}
logger.V(0).Info("patched resource", "patchType", string(patchType), "resourceVersion", current.GetResourceVersion())
logger.V(0).Info("patched resource", "patchType", string(patchType), "resourceVersion", current.GetResourceVersion(), "previousResourceVersion", prevRV)

return nil
}

func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitution.Resource, current *unstructured.Unstructured) ([]byte, types.PatchType, error) {
// We need to remove the creation timestamp since the other versions of the resource we're merging against won't have it.
// It's safe to mutate in this case because resource has already been copied by the cache.
current.SetCreationTimestamp(metav1.NewTime(time.Time{}))

var prevManifest []byte
if prev != nil {
prevManifest = []byte(prev.Manifest)
Expand All @@ -180,3 +181,20 @@ func (c *Controller) buildPatch(ctx context.Context, prev, resource *reconstitut
patch, err := strategicpatch.CreateThreeWayMergePatch(prevManifest, []byte(resource.Manifest), currentJS, patchmeta, true)
return patch, types.StrategicMergePatchType, err
}

func mungePatch(patch []byte, rv string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patch, &patchMap)
if err != nil {
return nil, err
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, err
}
a.SetResourceVersion(rv)
a.SetCreationTimestamp(metav1.Time{})

return json.Marshal(patchMap)
}
2 changes: 0 additions & 2 deletions internal/controllers/reconciliation/discoverycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ func (d *discoveryCache) checkSupportUnlocked(ctx context.Context, gvk schema.Gr

for _, c := range d.current.GetConsumes(gvk, "PATCH") {
if c == string(types.StrategicMergePatchType) {
logger.V(1).Info("using strategic merge")
return model, nil
}
}

logger.V(1).Info("not using strategic merge because it is not supported by the resource")
return nil, nil // doesn't support strategic merge
}
25 changes: 11 additions & 14 deletions internal/controllers/reconciliation/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestCRUD(t *testing.T) {
obj, err := test.Get(downstream)
require.NoError(t, err)

updatedObj := test.ApplyExternalUpdate(t, obj)
updatedObj := test.ApplyExternalUpdate(t, obj.DeepCopyObject().(client.Object))
updatedObj = setPhase(updatedObj, "external-update")
if err := downstream.Update(ctx, updatedObj); err != nil {
return err
Expand All @@ -214,9 +214,18 @@ func TestCRUD(t *testing.T) {
}

func (c *crudTestCase) WaitForPhase(t *testing.T, downstream client.Client, phase string) {
var lastRV string
testutil.Eventually(t, func() bool {
obj, err := c.Get(downstream)
return err == nil && getPhase(obj) == phase
currentPhase := getPhase(obj)
currentRV := obj.GetResourceVersion()
if lastRV == "" {
t.Logf("initial resource version %s was observed in phase %s", currentRV, currentPhase)
} else if currentRV != lastRV {
t.Logf("resource version transitioned from %s to %s in phase %s", lastRV, currentRV, currentPhase)
}
lastRV = currentRV
return err == nil && currentPhase == phase
})
}

Expand All @@ -236,18 +245,6 @@ func setImage(t *testing.T, upstream client.Client, syn *apiv1.Synthesizer, comp
return upstream.Update(context.Background(), syn)
})
require.NoError(t, err)

// Also pin the composition to >= this synthesizer version.
// This is necessary to avoid deadlock in cases where incoherent cache causes the composition not to be updated on this tick of the rollout loop.
// It isn't a problem in production because we don't expect serialized behavior from the rollout controller.
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := upstream.Get(context.Background(), client.ObjectKeyFromObject(comp), comp); err != nil {
return err
}
comp.Spec.Synthesizer.MinGeneration = syn.Generation
return upstream.Update(context.Background(), comp)
})
require.NoError(t, err)
}

func newSliceBuilder(t *testing.T, scheme *runtime.Scheme, test *crudTestCase) func(c *apiv1.Composition, s *apiv1.Synthesizer) []*apiv1.ResourceSlice {
Expand Down
18 changes: 8 additions & 10 deletions internal/controllers/synthesis/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
var minimalTestConfig = &Config{
WrapperImage: "test-wrapper-image",
MaxRestarts: 3,
Timeout: time.Second * 2,
Timeout: time.Second * 6,
}

func TestControllerHappyPath(t *testing.T) {
Expand Down Expand Up @@ -107,12 +107,11 @@ func TestControllerHappyPath(t *testing.T) {
})

// The pod eventually completes and is deleted
// TODO: Why does this fail?
// testutil.Eventually(t, func() bool {
// list := &corev1.PodList{}
// require.NoError(t, cli.List(ctx, list))
// return len(list.Items) == 0
// })
testutil.Eventually(t, func() bool {
list := &corev1.PodList{}
require.NoError(t, cli.List(ctx, list))
return len(list.Items) == 0
})
}

func TestControllerFastCompositionUpdates(t *testing.T) {
Expand Down Expand Up @@ -254,12 +253,12 @@ func TestControllerSwitchingSynthesizers(t *testing.T) {

syn1 := &apiv1.Synthesizer{}
syn1.Name = "test-syn-1"
syn1.Spec.Image = "test-syn-image"
syn1.Spec.Image = "initial-image"
require.NoError(t, cli.Create(ctx, syn1))

syn2 := &apiv1.Synthesizer{}
syn2.Name = "test-syn-2"
syn2.Spec.Image = "initial-image"
syn2.Spec.Image = "updated-image"
require.NoError(t, cli.Create(ctx, syn2))

comp := &apiv1.Composition{}
Expand All @@ -285,7 +284,6 @@ func TestControllerSwitchingSynthesizers(t *testing.T) {
})
require.NoError(t, err)

// TODO: This test is a tiny bit flaky
testutil.Eventually(t, func() bool {
require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp))
return comp.Status.CurrentState != nil && len(comp.Status.CurrentState.ResourceSlices) == 2
Expand Down
16 changes: 8 additions & 8 deletions internal/controllers/synthesis/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if comp.Spec.Synthesizer.Name == "" {
return ctrl.Result{}, nil
}
logger = logger.WithValues("composition", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation)
logger = logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation)

syn := &apiv1.Synthesizer{}
syn.Name = comp.Spec.Synthesizer.Name
err = c.client.Get(ctx, client.ObjectKeyFromObject(syn), syn)
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting synthesizer: %w", err)
}
logger = logger.WithValues("synthesizer", syn.Name, "synthesizerGeneration", syn.Generation)
logger = logger.WithValues("synthesizerName", syn.Name, "synthesizerGeneration", syn.Generation)

// Delete any unnecessary pods
pods := &corev1.PodList{}
Expand All @@ -78,7 +78,7 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if err := c.client.Delete(ctx, toDelete); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting pod: %w", err)
}
logger.Info("deleted pod", "podName", toDelete.Name)
logger.Info("deleted synthesizer pod", "podName", toDelete.Name)
return ctrl.Result{}, nil
}

Expand All @@ -88,12 +88,12 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if err := c.client.Status().Update(ctx, comp); err != nil {
return ctrl.Result{}, fmt.Errorf("swapping compisition state: %w", err)
}
logger.Info("swapped composition state because composition was modified since last synthesis")
logger.Info("start to synthesize")
return ctrl.Result{}, nil
}

// No need to create a pod if everything is in sync
if comp.Status.CurrentState != nil && comp.Status.CurrentState.ResourceSlices != nil {
if comp.Status.CurrentState != nil && comp.Status.CurrentState.Synthesized {
return ctrl.Result{}, nil
}

Expand All @@ -103,7 +103,7 @@ func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request
if err != nil {
return ctrl.Result{}, client.IgnoreAlreadyExists(fmt.Errorf("creating pod: %w", err))
}
logger.Info("created pod", "podName", pod.Name)
logger.Info("created synthesizer pod", "podName", pod.Name)

return ctrl.Result{}, nil
}
Expand All @@ -117,7 +117,7 @@ func (c *podLifecycleController) shouldDeletePod(logger logr.Logger, comp *apiv1
var activeLatest bool
for _, pod := range pods.Items {
pod := pod
if pod.DeletionTimestamp != nil || !podDerivedFrom(comp, syn, &pod) {
if pod.DeletionTimestamp != nil || !podDerivedFrom(comp, &pod) {
continue
}
if activeLatest {
Expand All @@ -131,7 +131,7 @@ func (c *podLifecycleController) shouldDeletePod(logger logr.Logger, comp *apiv1
for _, pod := range pods.Items {
pod := pod
reason, shouldDelete := c.podStatusTerminal(&pod)
isCurrent := podDerivedFrom(comp, syn, &pod)
isCurrent := podDerivedFrom(comp, &pod)

// If the current pod is being deleted it's safe to create a new one if needed
// Avoid getting stuck by pods that fail to delete
Expand Down
10 changes: 3 additions & 7 deletions internal/controllers/synthesis/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,11 @@ func newPod(cfg *Config, scheme *runtime.Scheme, comp *apiv1.Composition, syn *a
return pod
}

func podDerivedFrom(comp *apiv1.Composition, syn *apiv1.Synthesizer, pod *corev1.Pod) bool {
func podDerivedFrom(comp *apiv1.Composition, pod *corev1.Pod) bool {
if pod.Annotations == nil {
return false
}

var (
compGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/composition-generation"], 10, 0)
synGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/synthesizer-generation"], 10, 0)
)

return compGen == comp.Generation && synGen == syn.Generation
compGen, _ := strconv.ParseInt(pod.Annotations["eno.azure.io/composition-generation"], 10, 0)
return compGen == comp.Generation
}
23 changes: 16 additions & 7 deletions internal/controllers/synthesis/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewRolloutController(mgr ctrl.Manager, cooldownPeriod time.Duration) error
}
return ctrl.NewControllerManagedBy(mgr).
For(&apiv1.Synthesizer{}).
Watches(&apiv1.Composition{}, manager.NewCompositionToSynthesizerHandler(c.client)).
WithLogConstructor(manager.NewLogConstructor(mgr, "rolloutController")).
Complete(c)
}
Expand All @@ -45,11 +46,10 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
logger = logger.WithValues("synthesizerName", syn.Name, "synthesizerNamespace", syn.Namespace, "synthesizerGeneration", syn.Generation)

if syn.Status.LastRolloutTime != nil {
if syn.Status.LastRolloutTime != nil && syn.Status.CurrentGeneration != syn.Generation {
remainingCooldown := c.cooldown - time.Since(syn.Status.LastRolloutTime.Time)
if remainingCooldown > 0 {
logger.V(1).Info("waiting to roll out a synthesizer change until the cooldown period has passed", "latency", remainingCooldown.Milliseconds())
return ctrl.Result{RequeueAfter: remainingCooldown}, nil
return ctrl.Result{RequeueAfter: remainingCooldown}, nil // not ready to continue rollout yet
}
}

Expand All @@ -62,6 +62,7 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

// randomize list to avoid always rolling out changes in the same order
// TODO: Consider a more efficient approach here
rand.Shuffle(len(compList.Items), func(i, j int) { compList.Items[i] = compList.Items[j] })

for _, comp := range compList.Items {
Expand All @@ -77,7 +78,6 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
if err := c.client.Status().Update(ctx, syn); err != nil {
return ctrl.Result{}, fmt.Errorf("advancing last rollout time: %w", err)
}
logger.V(1).Info("advanced last rollout time")

err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := c.client.Get(ctx, client.ObjectKeyFromObject(&comp), &comp); err != nil {
Expand All @@ -89,18 +89,27 @@ func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ct
if err != nil {
return ctrl.Result{}, fmt.Errorf("swapping compisition state: %w", err)
}
logger.Info("synthesizing composition because its synthesizer has changed since last synthesis")
logger.Info("advancing synthesizer rollout process")
return ctrl.Result{RequeueAfter: c.cooldown}, nil
}

// Update the status to reflect the completed rollout
if syn.Status.CurrentGeneration != syn.Generation {
previousTime := syn.Status.LastRolloutTime
now := metav1.Now()
syn.Status.LastRolloutTime = &now
syn.Status.CurrentGeneration = syn.Generation
if err := c.client.Status().Update(ctx, syn); err != nil {
return ctrl.Result{}, fmt.Errorf("updating synthesizer's current generation: %w", err)
}
logger.Info("rollout is complete - updated synthesizer's current generation")
return ctrl.Result{}, nil // TODO: Consider leaving this loop open in case new compositions fell through the cracks earlier

if previousTime != nil {
logger = logger.WithValues("latency", now.Sub(previousTime.Time).Milliseconds())
}
if len(compList.Items) > 0 { // log doesn't make sense if the synthesizer wasn't actually rolled out
logger.Info("finished rolling out latest synthesizer version")
}
return ctrl.Result{}, nil
}

return ctrl.Result{}, nil
Expand Down
Loading

0 comments on commit 2bfb8dc

Please sign in to comment.