diff --git a/.github/workflows/unit.yaml b/.github/workflows/unit.yaml index d13dfcf5..2d514c6c 100644 --- a/.github/workflows/unit.yaml +++ b/.github/workflows/unit.yaml @@ -11,5 +11,5 @@ jobs: with: go-version: '1.21' - name: Test - run: go test -v ./... + run: KUBEBUILDER_ASSETS=$(go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest use -p path 1.28.x) go test -v ./... diff --git a/internal/reconstitution/reconstituter.go b/internal/reconstitution/reconstituter.go new file mode 100644 index 00000000..e174b332 --- /dev/null +++ b/internal/reconstitution/reconstituter.go @@ -0,0 +1,133 @@ +package reconstitution + +import ( + "context" + "fmt" + "sync/atomic" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/go-logr/logr" +) + +const indexName = ".metadata.owner" + +// reconstituter reconstitutes individual resources out of resource slices. +// Similar to an informer but with extra logic to handle expanding the slice resources. +type reconstituter struct { + *cache // embedded because caching is logically part of the reconstituter's functionality + client client.Client + queues []workqueue.Interface + started atomic.Bool +} + +func newReconstituter(mgr ctrl.Manager) (*reconstituter, error) { + // Index resource slices by the specific synthesis they originate from + err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.ResourceSlice{}, indexName, func(o client.Object) []string { + slice := o.(*apiv1.ResourceSlice) + owner := metav1.GetControllerOf(slice) + if owner == nil || owner.Kind != "Composition" { + return nil + } + // keys will not collide because k8s doesn't allow slashes in names + return []string{fmt.Sprintf("%s/%d", owner.Name, slice.Spec.CompositionGeneration)} + }) + if err != nil { + return nil, err + } + + r := &reconstituter{ + cache: newCache(mgr.GetClient()), + client: mgr.GetClient(), + } + _, err = ctrl.NewControllerManagedBy(mgr). + Named("reconstituter"). + For(&apiv1.Composition{}). + Owns(&apiv1.ResourceSlice{}). + Build(r) + return r, err +} + +func (r *reconstituter) AddQueue(queue workqueue.Interface) { + if r.started.Load() { + panic("AddQueue must be called before any resources are reconciled") + } + r.queues = append(r.queues, queue) +} + +func (r *reconstituter) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.started.Store(true) + logger := logr.FromContextOrDiscard(ctx) + logger.V(1).WithValues("composition", req).Info("caching composition") + + comp := &apiv1.Composition{} + err := r.client.Get(ctx, req.NamespacedName, comp) + if k8serrors.IsNotFound(err) { + r.cache.Purge(ctx, req.NamespacedName, nil) + return ctrl.Result{}, nil + } + if err != nil { + return ctrl.Result{}, fmt.Errorf("getting resource: %w", err) + } + + // We populate the cache with both the previous and current syntheses + err = r.populateCache(ctx, comp, comp.Status.PreviousState) + if err != nil { + return ctrl.Result{}, fmt.Errorf("processing previous state: %w", err) + } + err = r.populateCache(ctx, comp, comp.Status.CurrentState) + if err != nil { + return ctrl.Result{}, fmt.Errorf("processing current state: %w", err) + } + + r.cache.Purge(ctx, req.NamespacedName, comp) + return ctrl.Result{}, nil +} + +func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Composition, synthesis *apiv1.Synthesis) error { + logger := logr.FromContextOrDiscard(ctx) + + if synthesis == nil { + return nil + } + compNSN := types.NamespacedName{Namespace: comp.Namespace, Name: comp.Name} + + logger = logger.WithValues("synthesisGen", synthesis.ObservedGeneration) + ctx = logr.NewContext(ctx, logger) + if r.cache.HasSynthesis(ctx, compNSN, synthesis) { + logger.V(1).Info("this synthesis has already been cached") + return nil + } + + slices := &apiv1.ResourceSliceList{} + err := r.client.List(ctx, slices, client.InNamespace(comp.Namespace), client.MatchingFields{ + indexName: fmt.Sprintf("%s/%d", comp.Name, synthesis.ObservedGeneration), + }) + if err != nil { + return fmt.Errorf("listing resource slices: %w", err) + } + + logger.V(1).Info(fmt.Sprintf("found %d slices for this synthesis", len(slices.Items))) + if int64(len(slices.Items)) != synthesis.ResourceSliceCount { + logger.V(1).Info("stale informer - waiting for sync") + return nil + } + + reqs, err := r.cache.Fill(ctx, compNSN, synthesis, slices.Items) + if err != nil { + return err + } + for _, req := range reqs { + for _, queue := range r.queues { + queue.Add(req) + } + } + + return nil +} diff --git a/internal/reconstitution/reconstituter_test.go b/internal/reconstitution/reconstituter_test.go new file mode 100644 index 00000000..f026b982 --- /dev/null +++ b/internal/reconstitution/reconstituter_test.go @@ -0,0 +1,73 @@ +package reconstitution + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/testutil" +) + +func TestReconstituterIntegration(t *testing.T) { + ctx := testutil.NewContext(t) + mgr := testutil.NewManager(t) + client := mgr.GetClient() + + r, err := newReconstituter(mgr.Manager) + require.NoError(t, err) + queue := workqueue.New() + r.AddQueue(queue) + mgr.Start(t) + + // Create one composition that has one synthesis of a single resource + comp := &apiv1.Composition{} + comp.Name = "test-composition" + comp.Namespace = "default" + require.NoError(t, client.Create(ctx, comp)) + + comp.Status.CurrentState = &apiv1.Synthesis{ + ObservedGeneration: comp.Generation, + ResourceSliceCount: 1, + } + require.NoError(t, client.Status().Update(ctx, comp)) + + slice := &apiv1.ResourceSlice{} + slice.Name = "test-slice" + slice.Namespace = "default" + slice.Spec.CompositionGeneration = comp.Generation + slice.Spec.Resources = []apiv1.Manifest{{ + Manifest: `{"kind":"baz","apiVersion":"any","metadata":{"name":"foo","namespace":"bar"}}`, + }} + require.NoError(t, controllerutil.SetControllerReference(comp, slice, mgr.GetScheme())) + require.NoError(t, client.Create(ctx, slice)) + + // Prove the resource was cached + ref := &ResourceRef{ + Composition: types.NamespacedName{ + Name: comp.Name, + Namespace: comp.Namespace, + }, + Name: "foo", + Namespace: "bar", + Kind: "baz", + } + testutil.Eventually(t, func() bool { + _, exists := r.Get(ctx, ref, comp.Generation) + return exists + }) + + // Remove the composition and confirm cache is purged + require.NoError(t, client.Delete(ctx, comp)) + testutil.Eventually(t, func() bool { + _, exists := r.Get(ctx, ref, comp.Generation) + return !exists + }) + + // The queue should have been populated + assert.Equal(t, 1, queue.Len()) +} diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index bd2ad248..baada427 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -2,16 +2,22 @@ package testutil import ( "context" + "fmt" + "path/filepath" "testing" + "time" "github.com/go-logr/logr" "github.com/go-logr/logr/testr" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/manager" apiv1 "github.com/Azure/eno/api/v1" ) @@ -43,3 +49,60 @@ func NewContext(t *testing.T) context.Context { }) return logr.NewContext(ctx, testr.NewWithOptions(t, testr.Options{Verbosity: 99})) } + +func NewManager(t *testing.T) *Manager { + env := &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "api", "v1", "config", "crd")}, + } + t.Cleanup(func() { + err := env.Stop() + if err != nil { + panic(err) + } + }) + + cfg, err := env.Start() + require.NoError(t, err) + + mgr, err := ctrl.NewManager(cfg, manager.Options{ + Logger: testr.New(t), + BaseContext: func() context.Context { return NewContext(t) }, + }) + require.NoError(t, err) + + err = apiv1.SchemeBuilder.AddToScheme(mgr.GetScheme()) + require.NoError(t, err) + + return &Manager{ + Manager: mgr, + } +} + +type Manager struct { + ctrl.Manager +} + +func (m *Manager) Start(t *testing.T) { + go func() { + err := m.Manager.Start(NewContext(t)) + if err != nil { + // can't t.Fail here since we're in a different goroutine + panic(fmt.Sprintf("error while starting manager: %s", err)) + } + }() +} + +func Eventually(t testing.TB, fn func() bool) { + t.Helper() + start := time.Now() + for { + if time.Since(start) > time.Second*2 { + t.Fatalf("timeout while waiting for condition") + return + } + if fn() { + return + } + time.Sleep(time.Millisecond * 10) + } +}