diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..00b64247 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "go.testTimeout": "120s" +} \ No newline at end of file diff --git a/internal/controllers/synthesis/integration_test.go b/internal/controllers/synthesis/integration_test.go new file mode 100644 index 00000000..741a57c2 --- /dev/null +++ b/internal/controllers/synthesis/integration_test.go @@ -0,0 +1,294 @@ +package synthesis + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/testutil" +) + +var minimalTestConfig = &Config{ + WrapperImage: "test-wrapper-image", + MaxRestarts: 3, + Timeout: time.Second * 2, +} + +func TestControllerHappyPath(t *testing.T) { + ctx := testutil.NewContext(t) + mgr := testutil.NewManager(t) + testutil.NewPodController(t, mgr.Manager, nil) + cli := mgr.GetClient() + + require.NoError(t, NewPodLifecycleController(mgr.Manager, minimalTestConfig)) + require.NoError(t, NewStatusController(mgr.Manager)) + require.NoError(t, NewRolloutController(mgr.Manager, time.Millisecond*10)) + mgr.Start(t) + + syn := &apiv1.Synthesizer{} + syn.Name = "test-syn" + syn.Spec.Image = "test-syn-image" + require.NoError(t, cli.Create(ctx, syn)) + + comp := &apiv1.Composition{} + comp.Name = "test-comp" + comp.Namespace = "default" + comp.Spec.Synthesizer.Name = syn.Name + require.NoError(t, cli.Create(ctx, comp)) + + t.Run("initial creation", func(t *testing.T) { + // It creates a pod to synthesize the composition + testutil.Eventually(t, func() bool { + list := &corev1.PodList{} + require.NoError(t, cli.List(ctx, list)) + return len(list.Items) > 0 + }) + + // The pod eventually completes and is deleted + testutil.Eventually(t, func() bool { + list := &corev1.PodList{} + require.NoError(t, cli.List(ctx, list)) + return len(list.Items) == 0 + }) + + // The pod eventually writes a resource slice count to the status + testutil.Eventually(t, func() bool { + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)) + return comp.Status.CurrentState != nil && comp.Status.CurrentState.ResourceSliceCount != nil + }) + }) + + t.Run("composition update", func(t *testing.T) { + // Updating the composition should cause re-synthesis + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)) + comp.Spec.Inputs = []apiv1.InputRef{{Name: "anything"}} + return cli.Update(ctx, comp) + }) + require.NoError(t, err) + + latest := comp.Generation + testutil.Eventually(t, func() bool { + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)) + return comp.Status.CurrentState != nil && comp.Status.CurrentState.ObservedCompositionGeneration == latest + }) + + // The previous state is retained + if comp.Status.PreviousState == nil { + t.Error("state wasn't swapped to previous") + } else { + assert.Equal(t, comp.Generation-1, comp.Status.PreviousState.ObservedCompositionGeneration) + } + }) + + t.Run("synthesizer update", func(t *testing.T) { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := cli.Get(ctx, client.ObjectKeyFromObject(syn), syn); err != nil { + return err + } + syn.Spec.Image = "updated-image" + return cli.Update(ctx, syn) + }) + require.NoError(t, err) + + testutil.Eventually(t, func() bool { + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)) + return comp.Status.CurrentState != nil && comp.Status.CurrentState.ObservedSynthesizerGeneration == syn.Generation + }) + + // The previous state is retained + if comp.Status.PreviousState == nil { + t.Error("state wasn't swapped to previous") + } else { + assert.Equal(t, syn.Generation-1, comp.Status.PreviousState.ObservedSynthesizerGeneration) + } + }) + + // The pod eventually completes and is deleted + testutil.Eventually(t, func() bool { + list := &corev1.PodList{} + require.NoError(t, cli.List(ctx, list)) + return len(list.Items) == 0 + }) +} + +func TestControllerFastCompositionUpdates(t *testing.T) { + ctx := testutil.NewContext(t) + mgr := testutil.NewManager(t) + cli := mgr.GetClient() + testutil.NewPodController(t, mgr.Manager, func(c *apiv1.Composition, s *apiv1.Synthesizer) []*apiv1.ResourceSlice { + // simulate real pods taking some random amount of time to generation + time.Sleep(time.Millisecond * time.Duration(rand.Int63n(300))) + return nil + }) + + require.NoError(t, NewPodLifecycleController(mgr.Manager, minimalTestConfig)) + require.NoError(t, NewStatusController(mgr.Manager)) + require.NoError(t, NewRolloutController(mgr.Manager, time.Millisecond*10)) + mgr.Start(t) + + syn := &apiv1.Synthesizer{} + syn.Name = "test-syn" + syn.Spec.Image = "test-syn-image" + require.NoError(t, cli.Create(ctx, syn)) + + comp := &apiv1.Composition{} + comp.Name = "test-comp" + comp.Namespace = "default" + comp.Spec.Synthesizer.Name = syn.Name + require.NoError(t, cli.Create(ctx, comp)) + + // Send a bunch of updates in a row + for i := 0; i < 10; i++ { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err := cli.Get(ctx, client.ObjectKeyFromObject(comp), comp) + if client.IgnoreNotFound(err) != nil { + return err + } + comp.Spec.Inputs = []apiv1.InputRef{{ + Name: fmt.Sprintf("some-unique-value-%d", i), + }} + return cli.Update(ctx, comp) + }) + require.NoError(t, err) + } + + // It should eventually converge even though pods did not terminate in order (due to jitter in testutil.NewPodController) + latest := comp.Generation + testutil.Eventually(t, func() bool { + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)) + return comp.Status.CurrentState != nil && comp.Status.CurrentState.ObservedCompositionGeneration == latest + }) +} + +func TestControllerSynthesizerRollout(t *testing.T) { + ctx := testutil.NewContext(t) + mgr := testutil.NewManager(t) + testutil.NewPodController(t, mgr.Manager, nil) + cli := mgr.GetClient() + + require.NoError(t, NewPodLifecycleController(mgr.Manager, minimalTestConfig)) + require.NoError(t, NewStatusController(mgr.Manager)) + require.NoError(t, NewRolloutController(mgr.Manager, time.Hour*24)) // Rollout should not continue during this test + mgr.Start(t) + + syn := &apiv1.Synthesizer{} + syn.Name = "test-syn" + syn.Spec.Image = "test-syn-image" + require.NoError(t, cli.Create(ctx, syn)) + + comp1 := &apiv1.Composition{} + comp1.Name = "test-comp-1" + comp1.Namespace = "default" + comp1.Spec.Synthesizer.Name = syn.Name + require.NoError(t, cli.Create(ctx, comp1)) + + comp2 := &apiv1.Composition{} + comp2.Name = "test-comp-2" + comp2.Namespace = "default" + comp2.Spec.Synthesizer.Name = syn.Name + require.NoError(t, cli.Create(ctx, comp2)) + + // Wait for initial sync + testutil.Eventually(t, func() bool { + require.NoError(t, client.IgnoreNotFound(cli.Get(ctx, client.ObjectKeyFromObject(comp1), comp1))) + require.NoError(t, client.IgnoreNotFound(cli.Get(ctx, client.ObjectKeyFromObject(comp2), comp2))) + inSync1 := comp1.Status.CurrentState != nil && comp1.Status.CurrentState.ObservedSynthesizerGeneration == syn.Generation + inSync2 := comp2.Status.CurrentState != nil && comp2.Status.CurrentState.ObservedSynthesizerGeneration == syn.Generation + return inSync1 && inSync2 + }) + + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := cli.Get(ctx, client.ObjectKeyFromObject(syn), syn); err != nil { + return err + } + syn.Spec.Image = "updated-image" + return cli.Update(ctx, syn) + }) + require.NoError(t, err) + + // One of the compositions should be updated but not the other because we set a RolloutCooldown of 1hr + assertRolloutPending := func() { + testutil.Eventually(t, func() bool { + require.NoError(t, client.IgnoreNotFound(cli.Get(ctx, client.ObjectKeyFromObject(comp1), comp1))) + require.NoError(t, client.IgnoreNotFound(cli.Get(ctx, client.ObjectKeyFromObject(comp2), comp2))) + inSync1 := comp1.Status.CurrentState != nil && comp1.Status.CurrentState.ObservedSynthesizerGeneration == syn.Generation + inSync2 := comp2.Status.CurrentState != nil && comp2.Status.CurrentState.ObservedSynthesizerGeneration == syn.Generation + return (inSync1 && !inSync2) || (!inSync1 && inSync2) + }) + } + + // Make sure the state persists + assertRolloutPending() + time.Sleep(time.Millisecond * 50) + assertRolloutPending() +} + +func TestControllerSwitchingSynthesizers(t *testing.T) { + ctx := testutil.NewContext(t) + mgr := testutil.NewManager(t) + cli := mgr.GetClient() + testutil.NewPodController(t, mgr.Manager, func(c *apiv1.Composition, s *apiv1.Synthesizer) []*apiv1.ResourceSlice { + emptySlice := &apiv1.ResourceSlice{} + emptySlice.GenerateName = "test-" + emptySlice.Namespace = "default" + + // return two slices for the second test synthesizer, we'll assert on that later + if s.Name == "test-syn-2" { + return []*apiv1.ResourceSlice{emptySlice.DeepCopy(), emptySlice.DeepCopy()} + } + return []*apiv1.ResourceSlice{emptySlice.DeepCopy()} + }) + + require.NoError(t, NewPodLifecycleController(mgr.Manager, minimalTestConfig)) + require.NoError(t, NewStatusController(mgr.Manager)) + require.NoError(t, NewRolloutController(mgr.Manager, time.Millisecond*10)) + mgr.Start(t) + + syn1 := &apiv1.Synthesizer{} + syn1.Name = "test-syn-1" + syn1.Spec.Image = "test-syn-image" + require.NoError(t, cli.Create(ctx, syn1)) + + syn2 := &apiv1.Synthesizer{} + syn2.Name = "test-syn-2" + syn2.Spec.Image = "initial-image" + require.NoError(t, cli.Create(ctx, syn2)) + + comp := &apiv1.Composition{} + comp.Name = "test-comp" + comp.Namespace = "default" + comp.Spec.Synthesizer.Name = syn1.Name + require.NoError(t, cli.Create(ctx, comp)) + + t.Run("initial creation", func(t *testing.T) { + testutil.Eventually(t, func() bool { + require.NoError(t, client.IgnoreNotFound(cli.Get(ctx, client.ObjectKeyFromObject(comp), comp))) + return comp.Status.CurrentState != nil && comp.Status.CurrentState.ResourceSliceCount != nil && *comp.Status.CurrentState.ResourceSliceCount == 1 + }) + }) + + t.Run("update synthesizer name", func(t *testing.T) { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := cli.Get(ctx, client.ObjectKeyFromObject(comp), comp); err != nil { + return err + } + comp.Spec.Synthesizer.Name = syn2.Name + return cli.Update(ctx, comp) + }) + require.NoError(t, err) + + testutil.Eventually(t, func() bool { + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)) + return comp.Status.CurrentState != nil && comp.Status.CurrentState.ResourceSliceCount != nil && *comp.Status.CurrentState.ResourceSliceCount == 2 + }) + }) +} diff --git a/internal/controllers/synthesis/lifecycle.go b/internal/controllers/synthesis/lifecycle.go new file mode 100644 index 00000000..9d64a22a --- /dev/null +++ b/internal/controllers/synthesis/lifecycle.go @@ -0,0 +1,198 @@ +package synthesis + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" +) + +type Config struct { + WrapperImage string + JobSA string + MaxRestarts int32 + Timeout time.Duration +} + +type podLifecycleController struct { + config *Config + client client.Client +} + +// NewPodLifecycleController is responsible for creating and deleting pods as needed to synthesize compositions. +func NewPodLifecycleController(mgr ctrl.Manager, cfg *Config) error { + c := &podLifecycleController{ + config: cfg, + client: mgr.GetClient(), + } + return ctrl.NewControllerManagedBy(mgr). + For(&apiv1.Composition{}). + Owns(&corev1.Pod{}). + WithLogConstructor(manager.NewLogConstructor(mgr, "podLifecycleController")). + Complete(c) +} + +func (c *podLifecycleController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := logr.FromContextOrDiscard(ctx) + + comp := &apiv1.Composition{} + err := c.client.Get(ctx, req.NamespacedName, comp) + if err != nil { + return ctrl.Result{}, client.IgnoreNotFound(fmt.Errorf("getting composition resource: %w", err)) + } + if comp.Spec.Synthesizer.Name == "" { + return ctrl.Result{}, nil + } + logger = logger.WithValues("composition", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation) + + syn := &apiv1.Synthesizer{} + syn.Name = comp.Spec.Synthesizer.Name + err = c.client.Get(ctx, client.ObjectKeyFromObject(syn), syn) + if err != nil { + return ctrl.Result{}, fmt.Errorf("getting synthesizer: %w", err) + } + logger = logger.WithValues("synthesizer", syn.Name, "synthesizerGeneration", syn.Generation) + + // Delete any unnecessary pods + pods := &corev1.PodList{} + err = c.client.List(ctx, pods, client.MatchingFields{ + manager.IdxPodsByComposition: comp.Name, + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf("listing pods: %w", err) + } + logger, toDelete, ok := c.shouldDeletePod(logger, comp, syn, pods) + if !ok && toDelete == nil { + // The pod is still running. + // Poll periodically to check if has timed out. + return ctrl.Result{RequeueAfter: c.config.Timeout}, nil + } + if !ok && toDelete != nil { + if err := c.client.Delete(ctx, toDelete); err != nil { + return ctrl.Result{}, fmt.Errorf("deleting pod: %w", err) + } + logger.Info("deleted pod", "podName", toDelete.Name) + return ctrl.Result{}, nil + } + + // Swap the state to prepare for resynthesis if needed + if comp.Status.CurrentState == nil || comp.Status.CurrentState.ObservedCompositionGeneration != comp.Generation { + swapStates(syn, comp) + if err := c.client.Status().Update(ctx, comp); err != nil { + return ctrl.Result{}, fmt.Errorf("swapping compisition state: %w", err) + } + logger.Info("swapped composition state because composition was modified since last synthesis") + return ctrl.Result{}, nil + } + + // No need to create a pod if everything is in sync + if comp.Status.CurrentState != nil && comp.Status.CurrentState.ResourceSliceCount != nil { + logger.V(1).Info("synthesis is complete - skipping creation") + return ctrl.Result{}, nil + } + + // If we made it this far it's safe to create a pod + pod := newPod(c.config, c.client.Scheme(), comp, syn) + err = c.client.Create(ctx, pod) + if err != nil { + return ctrl.Result{}, client.IgnoreAlreadyExists(fmt.Errorf("creating pod: %w", err)) + } + logger.Info("created pod", "podName", pod.Name) + + return ctrl.Result{}, nil +} + +func (c *podLifecycleController) shouldDeletePod(logger logr.Logger, comp *apiv1.Composition, syn *apiv1.Synthesizer, pods *corev1.PodList) (logr.Logger, *corev1.Pod, bool) { + if len(pods.Items) == 0 { + return logger, nil, true + } + + // Just in case we somehow created more than one pod (stale informer cache, etc.) - delete duplicates + var activeLatest bool + for _, pod := range pods.Items { + pod := pod + if pod.DeletionTimestamp != nil || !podDerivedFrom(comp, syn, &pod) { + continue + } + if activeLatest { + logger = logger.WithValues("reason", "Duplicate") + return logger, &pod, false + } + activeLatest = true + } + + // Only create pods when the previous one is deleting or non-existant + for _, pod := range pods.Items { + pod := pod + reason, shouldDelete := c.podStatusTerminal(&pod) + isCurrent := podDerivedFrom(comp, syn, &pod) + + // If the current pod is being deleted it's safe to create a new one if needed + // Avoid getting stuck by pods that fail to delete + if pod.DeletionTimestamp != nil && isCurrent { + return logger, nil, true + } + + // Pod exists but still has work to do + if isCurrent && !shouldDelete { + continue + } + + // Don't delete pods again + if pod.DeletionTimestamp != nil { + continue // already deleted + } + + if isCurrent && comp.Status.CurrentState != nil && comp.Status.CurrentState.PodCreation != nil { + logger = logger.WithValues("latency", time.Since(comp.Status.CurrentState.PodCreation.Time).Milliseconds()) + } + if shouldDelete { + logger = logger.WithValues("reason", reason) + } + if !isCurrent { + logger = logger.WithValues("reason", "Superseded") + } + return logger, &pod, false + } + return logger, nil, false +} + +func (c *podLifecycleController) podStatusTerminal(pod *corev1.Pod) (string, bool) { + if time.Since(pod.CreationTimestamp.Time) > c.config.Timeout { + return "Timeout", true + } + for _, cont := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + if cont.RestartCount > c.config.MaxRestarts { + return "MaxRestartsExceeded", true + } + if cont.State.Terminated != nil && cont.State.Terminated.ExitCode == 0 { + return "Succeeded", true + } + if cont.State.Terminated == nil || cont.State.Terminated.ExitCode != 0 { + return "", false // has not completed yet + } + } + if len(pod.Status.ContainerStatuses) > 0 { + return "Unknown", true // shouldn't be possible + } + return "", false // status not initialized yet +} + +func swapStates(syn *apiv1.Synthesizer, comp *apiv1.Composition) { + // Only swap current->previous when the current synthesis has completed + // This avoids losing the prior state during rapid updates to the composition + resourceSliceCountSet := comp.Status.CurrentState != nil && comp.Status.CurrentState.ResourceSliceCount != nil + if resourceSliceCountSet { + comp.Status.PreviousState = comp.Status.CurrentState + } + comp.Status.CurrentState = &apiv1.Synthesis{ + ObservedCompositionGeneration: comp.Generation, + } +} diff --git a/internal/controllers/synthesis/pod.go b/internal/controllers/synthesis/pod.go new file mode 100644 index 00000000..c5d16e81 --- /dev/null +++ b/internal/controllers/synthesis/pod.go @@ -0,0 +1,106 @@ +package synthesis + +import ( + "fmt" + "strconv" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + apiv1 "github.com/Azure/eno/api/v1" +) + +func newPod(cfg *Config, scheme *runtime.Scheme, comp *apiv1.Composition, syn *apiv1.Synthesizer) *corev1.Pod { + const wrapperVolumeName = "wrapper" + + pod := &corev1.Pod{} + pod.GenerateName = "synthesis-" + pod.Namespace = comp.Namespace + pod.Finalizers = []string{"eno.azure.io/cleanup"} + pod.Labels = map[string]string{"app.kubernetes.io/managed-by": "eno"} + pod.Annotations = map[string]string{ + "eno.azure.io/composition-generation": strconv.FormatInt(comp.Generation, 10), + "eno.azure.io/synthesizer-generation": strconv.FormatInt(syn.Generation, 10), + } + if err := controllerutil.SetControllerReference(comp, pod, scheme); err != nil { + panic(fmt.Sprintf("unable to set owner reference: %s", err)) + } + + userID := int64(1000) + yes := true + pod.Spec = corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + InitContainers: []corev1.Container{{ + Name: "setup", + Image: cfg.WrapperImage, + Command: []string{ + "/eno-wrapper", "--install=/wrapper/eno-wrapper", + }, + VolumeMounts: []corev1.VolumeMount{{ + Name: wrapperVolumeName, + MountPath: "/wrapper", + }}, + }}, + Containers: []corev1.Container{{ + Name: "synthesizer", + Image: syn.Spec.Image, + Command: []string{ + "/wrapper/eno-wrapper", "--generate", + }, + SecurityContext: &corev1.SecurityContext{ + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + RunAsUser: &userID, + RunAsNonRoot: &yes, + }, + VolumeMounts: []corev1.VolumeMount{{ + Name: wrapperVolumeName, + MountPath: "/wrapper", + ReadOnly: true, + }}, + Env: []corev1.EnvVar{ + { + Name: "COMPOSITION_NAME", + Value: comp.Name, + }, + { + Name: "COMPOSITION_NAMESPACE", + Value: comp.Namespace, + }, + { + Name: "COMPOSITION_GENERATION", + Value: strconv.FormatInt(comp.Generation, 10), + }, + { + Name: "SYNTHESIZER_GENERATION", + Value: strconv.FormatInt(syn.Generation, 10), + }, + }, + }}, + Volumes: []corev1.Volume{{ + Name: wrapperVolumeName, + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, + }}, + } + + if cfg.JobSA != "" { + pod.Spec.ServiceAccountName = cfg.JobSA + } + + return pod +} + +func podDerivedFrom(comp *apiv1.Composition, syn *apiv1.Synthesizer, pod *corev1.Pod) bool { + if pod.Annotations == nil { + return false + } + + var ( + compGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/composition-generation"], 10, 0) + synGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/synthesizer-generation"], 10, 0) + ) + + return compGen == comp.Generation && synGen == syn.Generation +} diff --git a/internal/controllers/synthesis/rollout.go b/internal/controllers/synthesis/rollout.go new file mode 100644 index 00000000..be995177 --- /dev/null +++ b/internal/controllers/synthesis/rollout.go @@ -0,0 +1,104 @@ +package synthesis + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" +) + +type rolloutController struct { + client client.Client + cooldown time.Duration +} + +// NewRolloutController re-synthesizes compositions when their synthesizer has changed while honoring a cooldown period. +func NewRolloutController(mgr ctrl.Manager, cooldownPeriod time.Duration) error { + c := &rolloutController{ + client: mgr.GetClient(), + cooldown: cooldownPeriod, + } + return ctrl.NewControllerManagedBy(mgr). + For(&apiv1.Synthesizer{}). + WithLogConstructor(manager.NewLogConstructor(mgr, "rolloutController")). + Complete(c) +} + +func (c *rolloutController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := logr.FromContextOrDiscard(ctx) + + syn := &apiv1.Synthesizer{} + err := c.client.Get(ctx, req.NamespacedName, syn) + if err != nil { + return ctrl.Result{}, client.IgnoreNotFound(fmt.Errorf("gettting synthesizer: %w", err)) + } + + if syn.Status.LastRolloutTime != nil { + remainingCooldown := c.cooldown - time.Since(syn.Status.LastRolloutTime.Time) + if remainingCooldown > 0 { + logger.V(1).Info("waiting to roll out a synthesizer change until the cooldown period has passed", "latency", remainingCooldown.Milliseconds()) + return ctrl.Result{RequeueAfter: remainingCooldown}, nil + } + } + + compList := &apiv1.CompositionList{} + err = c.client.List(ctx, compList, client.MatchingFields{ + manager.IdxCompositionsBySynthesizer: syn.Name, + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf("listing compositions: %w", err) + } + + // randomize list to avoid always rolling out changes in the same order + rand.Shuffle(len(compList.Items), func(i, j int) { compList.Items[i] = compList.Items[j] }) + + for _, comp := range compList.Items { + comp := comp + logger := logger.WithValues("compositionName", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation) + + if comp.Spec.Synthesizer.MinGeneration >= syn.Generation { + continue + } + + now := metav1.Now() + syn.Status.LastRolloutTime = &now + if err := c.client.Status().Update(ctx, syn); err != nil { + return ctrl.Result{}, fmt.Errorf("advancing last rollout time: %w", err) + } + logger.V(1).Info("advanced last rollout time") + + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := c.client.Get(ctx, client.ObjectKeyFromObject(&comp), &comp); err != nil { + return err + } + comp.Spec.Synthesizer.MinGeneration = syn.Generation + return c.client.Update(ctx, &comp) + }) + if err != nil { + return ctrl.Result{}, fmt.Errorf("swapping compisition state: %w", err) + } + logger.Info("synthesizing composition because its synthesizer has changed since last synthesis") + return ctrl.Result{RequeueAfter: c.cooldown}, nil + } + + // Update the status to reflect the completed rollout + if syn.Status.CurrentGeneration != syn.Generation { + syn.Status.CurrentGeneration = syn.Generation + if err := c.client.Status().Update(ctx, syn); err != nil { + return ctrl.Result{}, fmt.Errorf("updating synthesizer's current generation: %w", err) + } + logger.Info("rollout is complete - updated synthesizer's current generation") + return ctrl.Result{}, nil + } + + return ctrl.Result{}, nil +} diff --git a/internal/controllers/synthesis/status.go b/internal/controllers/synthesis/status.go new file mode 100644 index 00000000..d58c3f99 --- /dev/null +++ b/internal/controllers/synthesis/status.go @@ -0,0 +1,101 @@ +package synthesis + +import ( + "context" + "fmt" + "strconv" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" +) + +type statusController struct { + client client.Client +} + +// NewStatusController updates composition statuses as pods transition through states. +func NewStatusController(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + WithLogConstructor(manager.NewLogConstructor(mgr, "statusController")). + Complete(&statusController{ + client: mgr.GetClient(), + }) +} + +func (c *statusController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := logr.FromContextOrDiscard(ctx) + + pod := &corev1.Pod{} + err := c.client.Get(ctx, req.NamespacedName, pod) + if err != nil { + return ctrl.Result{}, client.IgnoreNotFound(fmt.Errorf("gettting pod: %w", err)) + } + if len(pod.OwnerReferences) == 0 || pod.OwnerReferences[0].Kind != "Composition" { + // This shouldn't be common as the informer watch filters on Eno-managed pods using a selector + logger.V(1).Info("skipping pod because it isn't owned by a composition") + return ctrl.Result{}, nil + } + if pod.Annotations == nil { + return ctrl.Result{}, nil + } + + comp := &apiv1.Composition{} + comp.Name = pod.OwnerReferences[0].Name + comp.Namespace = pod.Namespace + err = c.client.Get(ctx, client.ObjectKeyFromObject(comp), comp) + if err != nil { + return ctrl.Result{}, client.IgnoreNotFound(fmt.Errorf("getting composition resource: %w", err)) + } + if comp.Spec.Synthesizer.Name == "" { + return ctrl.Result{}, nil + } + logger = logger.WithValues("composition", comp.Name, "compositionNamespace", comp.Namespace, "compositionGeneration", comp.Generation) + + syn := &apiv1.Synthesizer{} + syn.Name = comp.Spec.Synthesizer.Name + err = c.client.Get(ctx, client.ObjectKeyFromObject(syn), syn) + if err != nil { + return ctrl.Result{}, fmt.Errorf("getting synthesizer: %w", err) + } + logger = logger.WithValues("synthesizer", syn.Name, "synthesizerGeneration", syn.Generation) + + // Update composition status + var ( + compGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/composition-generation"], 10, 0) + synGen, _ = strconv.ParseInt(pod.Annotations["eno.azure.io/synthesizer-generation"], 10, 0) + ) + if statusIsOutOfSync(comp, compGen, synGen) { + comp.Status.CurrentState.PodCreation = &pod.CreationTimestamp + comp.Status.CurrentState.ObservedSynthesizerGeneration = synGen + + if err := c.client.Status().Update(ctx, comp); err != nil { + return ctrl.Result{}, fmt.Errorf("updating composition status: %w", err) + } + logger.Info("populated synthesis status to reflect pod") + return ctrl.Result{}, nil + } + + // Remove the finalizer + if controllerutil.RemoveFinalizer(pod, "eno.azure.io/cleanup") { + if err := c.client.Update(ctx, pod); err != nil { + return ctrl.Result{}, fmt.Errorf("removing pod finalizer: %w", err) + } + logger.Info("removed pod finalizer") + return ctrl.Result{}, nil + } + + return ctrl.Result{}, nil +} + +func statusIsOutOfSync(comp *apiv1.Composition, podCompGen, podSynGen int64) bool { + // TODO: Unit tests, make sure to cover the pod creation latching logic + return (comp.Status.CurrentState != nil && comp.Status.CurrentState.ObservedCompositionGeneration == podCompGen) && + (comp.Status.CurrentState.PodCreation == nil || comp.Status.CurrentState.ObservedSynthesizerGeneration != podSynGen) +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 0b4ce532..7c8294c5 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -18,6 +19,8 @@ import ( const ( IdxSlicesByCompositionGeneration = ".metadata.ownerReferences.compositionGen" // see: NewSlicesByCompositionGenerationKey + IdxPodsByComposition = ".metadata.ownerReferences.composition" + IdxCompositionsBySynthesizer = ".spec.synthesizer" ) type Options struct { @@ -55,6 +58,26 @@ func New(logger logr.Logger, opts *Options) (ctrl.Manager, error) { return nil, err } + err = mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, IdxPodsByComposition, func(o client.Object) []string { + pod := o.(*corev1.Pod) + owner := metav1.GetControllerOf(pod) + if owner == nil || owner.Kind != "Composition" { + return nil + } + return []string{owner.Name} + }) + if err != nil { + return nil, err + } + + err = mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Composition{}, IdxCompositionsBySynthesizer, func(o client.Object) []string { + comp := o.(*apiv1.Composition) + return []string{comp.Spec.Synthesizer.Name} + }) + if err != nil { + return nil, err + } + return mgr, nil } diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 681b0cef..3b5e7c6a 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -17,7 +17,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/reconcile" apiv1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/manager" @@ -109,3 +111,91 @@ func Eventually(t testing.TB, fn func() bool) { time.Sleep(time.Millisecond * 10) } } + +// NewPodController adds a controller to the manager that simulates the behavior of a synthesis pod. +// Useful for integration testing without kcm/kubelet. Slices returned from the given function will +// be associated with the composition by this function. +func NewPodController(t testing.TB, mgr ctrl.Manager, fn func(*apiv1.Composition, *apiv1.Synthesizer) []*apiv1.ResourceSlice) { + cli := mgr.GetClient() + podCtrl := reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) { + comp := &apiv1.Composition{} + err := cli.Get(ctx, r.NamespacedName, comp) + if err != nil { + return reconcile.Result{}, err + } + if comp.Status.CurrentState == nil { + return reconcile.Result{}, nil // wait for controller to write initial status + } + + syn := &apiv1.Synthesizer{} + syn.Name = comp.Spec.Synthesizer.Name + err = cli.Get(ctx, client.ObjectKeyFromObject(syn), syn) + if err != nil { + return reconcile.Result{}, err + } + + var slices []*apiv1.ResourceSlice + if fn != nil { + slices = fn(comp, syn) + for _, slice := range slices { + cp := slice.DeepCopy() + cp.Spec.CompositionGeneration = comp.Generation + if err := controllerutil.SetControllerReference(comp, cp, cli.Scheme()); err != nil { + return reconcile.Result{}, err + } + if err := cli.Create(ctx, cp); err != nil { + return reconcile.Result{}, err + } + } + } + + pods := &corev1.PodList{} + err = cli.List(ctx, pods, client.MatchingFields{ + manager.IdxPodsByComposition: comp.Name, + }) + if err != nil { + return reconcile.Result{}, err + } + if len(pods.Items) == 0 { + return reconcile.Result{}, nil // no pods yet + } + + // Add resource slice count - the wrapper will do this in the real world + pod := pods.Items[0] + if comp.Status.CurrentState.ResourceSliceCount == nil { + count := int64(len(slices)) + comp.Status.CurrentState.ResourceSliceCount = &count + err = cli.Status().Update(ctx, comp) + if err != nil { + return reconcile.Result{}, err + } + t.Logf("updated resource slice count for %s", pod.Name) + return reconcile.Result{}, nil + } + + // Mark the pod as terminated to signal that synthesis is complete + if len(pod.Status.ContainerStatuses) == 0 { + pod.Status.ContainerStatuses = []corev1.ContainerStatus{{ + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 0, + }, + }, + }} + err = cli.Status().Update(ctx, &pod) + if err != nil { + return reconcile.Result{}, err + } + t.Logf("updated container status for %s", pod.Name) + return reconcile.Result{}, nil + } + + return reconcile.Result{}, nil + }) + + _, err := ctrl.NewControllerManagedBy(mgr). + For(&apiv1.Composition{}). + Owns(&corev1.Pod{}). + Build(podCtrl) + require.NoError(t, err) +}