Skip to content

Commit

Permalink
Populate the resource cache (#10)
Browse files Browse the repository at this point in the history
Adds the reconstituter: a controller that populates the resource cache
with the contents of resource slices.
  • Loading branch information
jveski authored Nov 13, 2023
1 parent e474ec9 commit c4dbbd4
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ jobs:
with:
go-version: '1.21'
- name: Test
run: go test -v ./...
run: KUBEBUILDER_ASSETS=$(go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest use -p path 1.28.x) go test -v ./...

133 changes: 133 additions & 0 deletions internal/reconstitution/reconstituter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package reconstitution

import (
"context"
"fmt"
"sync/atomic"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/Azure/eno/api/v1"
"github.com/go-logr/logr"
)

const indexName = ".metadata.owner"

// reconstituter reconstitutes individual resources out of resource slices.
// Similar to an informer but with extra logic to handle expanding the slice resources.
type reconstituter struct {
*cache // embedded because caching is logically part of the reconstituter's functionality
client client.Client
queues []workqueue.Interface
started atomic.Bool
}

func newReconstituter(mgr ctrl.Manager) (*reconstituter, error) {
// Index resource slices by the specific synthesis they originate from
err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.ResourceSlice{}, indexName, func(o client.Object) []string {
slice := o.(*apiv1.ResourceSlice)
owner := metav1.GetControllerOf(slice)
if owner == nil || owner.Kind != "Composition" {
return nil
}
// keys will not collide because k8s doesn't allow slashes in names
return []string{fmt.Sprintf("%s/%d", owner.Name, slice.Spec.CompositionGeneration)}
})
if err != nil {
return nil, err
}

r := &reconstituter{
cache: newCache(mgr.GetClient()),
client: mgr.GetClient(),
}
_, err = ctrl.NewControllerManagedBy(mgr).
Named("reconstituter").
For(&apiv1.Composition{}).
Owns(&apiv1.ResourceSlice{}).
Build(r)
return r, err
}

func (r *reconstituter) AddQueue(queue workqueue.Interface) {
if r.started.Load() {
panic("AddQueue must be called before any resources are reconciled")
}
r.queues = append(r.queues, queue)
}

func (r *reconstituter) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.started.Store(true)
logger := logr.FromContextOrDiscard(ctx)
logger.V(1).WithValues("composition", req).Info("caching composition")

comp := &apiv1.Composition{}
err := r.client.Get(ctx, req.NamespacedName, comp)
if k8serrors.IsNotFound(err) {
r.cache.Purge(ctx, req.NamespacedName, nil)
return ctrl.Result{}, nil
}
if err != nil {
return ctrl.Result{}, fmt.Errorf("getting resource: %w", err)
}

// We populate the cache with both the previous and current syntheses
err = r.populateCache(ctx, comp, comp.Status.PreviousState)
if err != nil {
return ctrl.Result{}, fmt.Errorf("processing previous state: %w", err)
}
err = r.populateCache(ctx, comp, comp.Status.CurrentState)
if err != nil {
return ctrl.Result{}, fmt.Errorf("processing current state: %w", err)
}

r.cache.Purge(ctx, req.NamespacedName, comp)
return ctrl.Result{}, nil
}

func (r *reconstituter) populateCache(ctx context.Context, comp *apiv1.Composition, synthesis *apiv1.Synthesis) error {
logger := logr.FromContextOrDiscard(ctx)

if synthesis == nil {
return nil
}
compNSN := types.NamespacedName{Namespace: comp.Namespace, Name: comp.Name}

logger = logger.WithValues("synthesisGen", synthesis.ObservedGeneration)
ctx = logr.NewContext(ctx, logger)
if r.cache.HasSynthesis(ctx, compNSN, synthesis) {
logger.V(1).Info("this synthesis has already been cached")
return nil
}

slices := &apiv1.ResourceSliceList{}
err := r.client.List(ctx, slices, client.InNamespace(comp.Namespace), client.MatchingFields{
indexName: fmt.Sprintf("%s/%d", comp.Name, synthesis.ObservedGeneration),
})
if err != nil {
return fmt.Errorf("listing resource slices: %w", err)
}

logger.V(1).Info(fmt.Sprintf("found %d slices for this synthesis", len(slices.Items)))
if int64(len(slices.Items)) != synthesis.ResourceSliceCount {
logger.V(1).Info("stale informer - waiting for sync")
return nil
}

reqs, err := r.cache.Fill(ctx, compNSN, synthesis, slices.Items)
if err != nil {
return err
}
for _, req := range reqs {
for _, queue := range r.queues {
queue.Add(req)
}
}

return nil
}
73 changes: 73 additions & 0 deletions internal/reconstitution/reconstituter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package reconstitution

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/testutil"
)

func TestReconstituterIntegration(t *testing.T) {
ctx := testutil.NewContext(t)
mgr := testutil.NewManager(t)
client := mgr.GetClient()

r, err := newReconstituter(mgr.Manager)
require.NoError(t, err)
queue := workqueue.New()
r.AddQueue(queue)
mgr.Start(t)

// Create one composition that has one synthesis of a single resource
comp := &apiv1.Composition{}
comp.Name = "test-composition"
comp.Namespace = "default"
require.NoError(t, client.Create(ctx, comp))

comp.Status.CurrentState = &apiv1.Synthesis{
ObservedGeneration: comp.Generation,
ResourceSliceCount: 1,
}
require.NoError(t, client.Status().Update(ctx, comp))

slice := &apiv1.ResourceSlice{}
slice.Name = "test-slice"
slice.Namespace = "default"
slice.Spec.CompositionGeneration = comp.Generation
slice.Spec.Resources = []apiv1.Manifest{{
Manifest: `{"kind":"baz","apiVersion":"any","metadata":{"name":"foo","namespace":"bar"}}`,
}}
require.NoError(t, controllerutil.SetControllerReference(comp, slice, mgr.GetScheme()))
require.NoError(t, client.Create(ctx, slice))

// Prove the resource was cached
ref := &ResourceRef{
Composition: types.NamespacedName{
Name: comp.Name,
Namespace: comp.Namespace,
},
Name: "foo",
Namespace: "bar",
Kind: "baz",
}
testutil.Eventually(t, func() bool {
_, exists := r.Get(ctx, ref, comp.Generation)
return exists
})

// Remove the composition and confirm cache is purged
require.NoError(t, client.Delete(ctx, comp))
testutil.Eventually(t, func() bool {
_, exists := r.Get(ctx, ref, comp.Generation)
return !exists
})

// The queue should have been populated
assert.Equal(t, 1, queue.Len())
}
63 changes: 63 additions & 0 deletions internal/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@ package testutil

import (
"context"
"fmt"
"path/filepath"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"

apiv1 "github.com/Azure/eno/api/v1"
)
Expand Down Expand Up @@ -43,3 +49,60 @@ func NewContext(t *testing.T) context.Context {
})
return logr.NewContext(ctx, testr.NewWithOptions(t, testr.Options{Verbosity: 99}))
}

func NewManager(t *testing.T) *Manager {
env := &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "api", "v1", "config", "crd")},
}
t.Cleanup(func() {
err := env.Stop()
if err != nil {
panic(err)
}
})

cfg, err := env.Start()
require.NoError(t, err)

mgr, err := ctrl.NewManager(cfg, manager.Options{
Logger: testr.New(t),
BaseContext: func() context.Context { return NewContext(t) },
})
require.NoError(t, err)

err = apiv1.SchemeBuilder.AddToScheme(mgr.GetScheme())
require.NoError(t, err)

return &Manager{
Manager: mgr,
}
}

type Manager struct {
ctrl.Manager
}

func (m *Manager) Start(t *testing.T) {
go func() {
err := m.Manager.Start(NewContext(t))
if err != nil {
// can't t.Fail here since we're in a different goroutine
panic(fmt.Sprintf("error while starting manager: %s", err))
}
}()
}

func Eventually(t testing.TB, fn func() bool) {
t.Helper()
start := time.Now()
for {
if time.Since(start) > time.Second*2 {
t.Fatalf("timeout while waiting for condition")
return
}
if fn() {
return
}
time.Sleep(time.Millisecond * 10)
}
}

0 comments on commit c4dbbd4

Please sign in to comment.