diff --git a/.github/workflows/unit.yaml b/.github/workflows/unit.yaml new file mode 100644 index 00000000..d13dfcf5 --- /dev/null +++ b/.github/workflows/unit.yaml @@ -0,0 +1,15 @@ +name: Go Unit Tests +on: [push] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.21' + - name: Test + run: go test -v ./... + diff --git a/go.mod b/go.mod index 02e52a60..62b4983e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/Azure/eno -go 1.20 +go 1.21 require ( github.com/beorn7/perks v1.0.1 // indirect diff --git a/internal/reconstitution/api.go b/internal/reconstitution/api.go new file mode 100644 index 00000000..ac861f37 --- /dev/null +++ b/internal/reconstitution/api.go @@ -0,0 +1,15 @@ +package reconstitution + +import ( + "k8s.io/apimachinery/pkg/types" + + apiv1 "github.com/Azure/eno/api/v1" +) + +type StatusPatchFn func(*apiv1.ResourceState) bool + +// ManifestRef references a particular resource manifest within a resource slice. +type ManifestRef struct { + Slice types.NamespacedName + Index int // position of this manifest within the slice +} diff --git a/internal/reconstitution/writebuffer.go b/internal/reconstitution/writebuffer.go new file mode 100644 index 00000000..1a33f2ca --- /dev/null +++ b/internal/reconstitution/writebuffer.go @@ -0,0 +1,155 @@ +package reconstitution + +import ( + "context" + "fmt" + "sync" + "time" + + "golang.org/x/time/rate" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/go-logr/logr" +) + +type asyncStatusUpdate struct { + SlicedResource *ManifestRef + PatchFn StatusPatchFn +} + +// writeBuffer reduces load on etcd/apiserver by collecting resource slice status +// updates over a short period of time and applying them in a single update request. +type writeBuffer struct { + client client.Client + logger logr.Logger + + // queue items are per-slice. + // the state map collects multiple updates per slice to be dispatched by next queue item. + mut sync.Mutex + state map[types.NamespacedName][]*asyncStatusUpdate + queue workqueue.RateLimitingInterface +} + +func newWriteBuffer(cli client.Client, logger logr.Logger, batchInterval time.Duration, burst int) *writeBuffer { + return &writeBuffer{ + client: cli, + logger: logger.WithValues("controller", "writeBuffer"), + state: make(map[types.NamespacedName][]*asyncStatusUpdate), + queue: workqueue.NewRateLimitingQueueWithConfig( + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Every(batchInterval), burst)}, + workqueue.RateLimitingQueueConfig{ + Name: "writeBuffer", + }), + } +} + +func (w *writeBuffer) PatchStatusAsync(ctx context.Context, ref *ManifestRef, patchFn StatusPatchFn) { + w.mut.Lock() + defer w.mut.Unlock() + + logr.FromContextOrDiscard(ctx).V(1).Info("buffering status update") + + key := ref.Slice + // TODO(jordan): Consider de-duping this slice to avoid potentially allocating a lot of memory if some bug causes churning of the control loop that ends up calling this. + w.state[key] = append(w.state[key], &asyncStatusUpdate{ + SlicedResource: ref, + PatchFn: patchFn, + }) + w.queue.AddRateLimited(key) +} + +func (w *writeBuffer) Start(ctx context.Context) error { + go func() { + <-ctx.Done() + w.queue.ShutDown() + }() + for w.processQueueItem(ctx) { + } + return nil +} + +func (w *writeBuffer) processQueueItem(ctx context.Context) bool { + item, shutdown := w.queue.Get() + if shutdown { + return false + } + defer w.queue.Done(item) + sliceNSN := item.(types.NamespacedName) + + w.mut.Lock() + updates := w.state[sliceNSN] + delete(w.state, sliceNSN) + w.mut.Unlock() + + logger := w.logger.WithValues("slice", sliceNSN) + ctx = logr.NewContext(ctx, logger) + + if len(updates) == 0 { + logger.V(0).Info("dropping queue item because no updates were found for this slice (this is suspicious)") + } + + if w.updateSlice(ctx, sliceNSN, updates) { + w.queue.Forget(item) + return true + } + + // Put the updates back in the buffer to retry on the next attempt + logger.V(1).Info("update failed - adding updates back to the buffer") + w.mut.Lock() + w.state[sliceNSN] = append(updates, w.state[sliceNSN]...) + w.mut.Unlock() + w.queue.AddRateLimited(item) + + return true +} + +func (w *writeBuffer) updateSlice(ctx context.Context, sliceNSN types.NamespacedName, updates []*asyncStatusUpdate) bool { + logger := logr.FromContextOrDiscard(ctx) + logger.V(1).Info("starting to update slice status") + + slice := &apiv1.ResourceSlice{} + err := w.client.Get(ctx, sliceNSN, slice) + if errors.IsNotFound(err) { + logger.V(0).Info("slice has been deleted, skipping status update") + return true + } + if err != nil { + logger.Error(err, "unable to get resource slice") + return false + } + + if len(slice.Status.Resources) != len(slice.Spec.Resources) { + logger.V(1).Info("allocating resource status slice") + slice.Status.Resources = make([]apiv1.ResourceState, len(slice.Spec.Resources)) + } + + var dirty bool + for _, update := range updates { + logger := logger.WithValues("slicedResource", update.SlicedResource) + statusPtr := &slice.Status.Resources[update.SlicedResource.Index] + + if update.PatchFn(statusPtr) { + logger.V(1).Info("patch caused status to change") + dirty = true + } else { + logger.V(1).Info("patch did not cause status to change") + } + } + if !dirty { + logger.V(1).Info("no status updates were necessary") + return true + } + + err = w.client.Status().Update(ctx, slice) + if err != nil { + logger.Error(err, "unable to update resource slice") + return false + } + + logger.V(0).Info(fmt.Sprintf("updated the status of %d resources in slice", len(updates))) + return true +} diff --git a/internal/reconstitution/writebuffer_test.go b/internal/reconstitution/writebuffer_test.go new file mode 100644 index 00000000..37207c2f --- /dev/null +++ b/internal/reconstitution/writebuffer_test.go @@ -0,0 +1,193 @@ +package reconstitution + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/go-logr/logr/testr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/testutil" +) + +func TestWriteBufferBasics(t *testing.T) { + ctx := context.Background() + cli := testutil.NewClient(t) + w := newWriteBuffer(cli, testr.New(t), 0, 1) + + // One resource slice w/ len of 3 + slice := &apiv1.ResourceSlice{} + slice.Name = "test-slice-1" + slice.Spec.Resources = make([]apiv1.Manifest, 3) + require.NoError(t, cli.Create(ctx, slice)) + + // One of the resources has been reconciled + req := &ManifestRef{} + req.Slice.Name = "test-slice-1" + req.Index = 1 + w.PatchStatusAsync(ctx, req, setReconciled()) + + // Slice resource's status should reflect the patch + w.processQueueItem(ctx) + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(slice), slice)) + require.Len(t, slice.Status.Resources, 3) + assert.False(t, slice.Status.Resources[0].Reconciled) + assert.True(t, slice.Status.Resources[1].Reconciled) + assert.False(t, slice.Status.Resources[2].Reconciled) + + // All state has been flushed + assert.Len(t, w.state, 0) + assert.Equal(t, 0, w.queue.Len()) +} + +func TestWriteBufferBatching(t *testing.T) { + ctx := context.Background() + var updateCalls atomic.Int32 + cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{ + SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { + updateCalls.Add(1) + return client.SubResource(subResourceName).Update(ctx, obj, opts...) + }, + }) + w := newWriteBuffer(cli, testr.New(t), time.Millisecond*2, 1) + + // One resource slice w/ len of 3 + slice := &apiv1.ResourceSlice{} + slice.Name = "test-slice-1" + slice.Spec.Resources = make([]apiv1.Manifest, 3) + require.NoError(t, cli.Create(ctx, slice)) + + // Two of the resources have been reconciled within the batch interval + req := &ManifestRef{} + req.Slice.Name = "test-slice-1" + req.Index = 1 + w.PatchStatusAsync(ctx, req, setReconciled()) + + req = &ManifestRef{} + req.Slice.Name = "test-slice-1" + req.Index = 2 + w.PatchStatusAsync(ctx, req, setReconciled()) + + // Slice resource's status should be correct after a single update + w.processQueueItem(ctx) + assert.Equal(t, int32(1), updateCalls.Load()) + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(slice), slice)) + require.Len(t, slice.Status.Resources, 3) + assert.False(t, slice.Status.Resources[0].Reconciled) + assert.True(t, slice.Status.Resources[1].Reconciled) + assert.True(t, slice.Status.Resources[2].Reconciled) +} + +func TestWriteBufferNoUpdates(t *testing.T) { + ctx := context.Background() + cli := testutil.NewClient(t) + w := newWriteBuffer(cli, testr.New(t), 0, 1) + + // One resource slice w/ len of 3 + slice := &apiv1.ResourceSlice{} + slice.Name = "test-slice-1" + slice.Spec.Resources = make([]apiv1.Manifest, 3) + require.NoError(t, cli.Create(ctx, slice)) + + // One of the resources has been reconciled + req := &ManifestRef{} + req.Slice.Name = "test-slice-1" + req.Index = 1 + w.PatchStatusAsync(ctx, req, setReconciled()) + + // Remove the update leaving the queue message in place + w.state = map[types.NamespacedName][]*asyncStatusUpdate{} + + // Slice's status should not have been initialized + w.processQueueItem(ctx) + require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(slice), slice)) + require.Len(t, slice.Status.Resources, 0) +} + +func TestWriteBufferMissingSlice(t *testing.T) { + ctx := context.Background() + cli := testutil.NewClient(t) + w := newWriteBuffer(cli, testr.New(t), 0, 1) + + req := &ManifestRef{} + req.Slice.Name = "test-slice-1" // this doesn't exist + w.PatchStatusAsync(ctx, req, setReconciled()) + + // Slice 404 drops the event and does not retry. + // Prevents a deadlock of this queue item. + w.processQueueItem(ctx) + assert.Equal(t, 0, w.queue.Len()) +} + +func TestWriteBufferNoChange(t *testing.T) { + ctx := context.Background() + cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{ + SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { + t.Fatal("should not have sent any status updates") + return nil + }, + }) + w := newWriteBuffer(cli, testr.New(t), 0, 1) + + // One resource slice + slice := &apiv1.ResourceSlice{} + slice.Name = "test-slice-1" + slice.Spec.Resources = make([]apiv1.Manifest, 3) + slice.Status.Resources = make([]apiv1.ResourceState, 3) + slice.Status.Resources[1].Reconciled = true // already accounted for + require.NoError(t, cli.Create(ctx, slice)) + + // One of the resources has been reconciled + req := &ManifestRef{} + req.Slice.Name = "test-slice-1" + req.Index = 1 + w.PatchStatusAsync(ctx, req, setReconciled()) + + w.processQueueItem(ctx) +} + +func TestWriteBufferUpdateError(t *testing.T) { + ctx := context.Background() + cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{ + SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { + return errors.New("could be any error") + }, + }) + w := newWriteBuffer(cli, testr.New(t), 0, 1) + + // One resource slice w/ len of 3 + slice := &apiv1.ResourceSlice{} + slice.Name = "test-slice-1" + slice.Spec.Resources = make([]apiv1.Manifest, 3) + require.NoError(t, cli.Create(ctx, slice)) + + // One of the resources has been reconciled + req := &ManifestRef{} + req.Slice.Name = "test-slice-1" + req.Index = 1 + w.PatchStatusAsync(ctx, req, setReconciled()) + + // Both the queue item and state have persisted + w.processQueueItem(ctx) + key := types.NamespacedName{Name: slice.Name} + assert.Len(t, w.state[key], 1) + assert.Equal(t, 1, w.queue.Len()) +} + +func setReconciled() StatusPatchFn { + return func(rs *apiv1.ResourceState) bool { + if rs.Reconciled { + return false // already set + } + rs.Reconciled = true + return true + } +} diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go new file mode 100644 index 00000000..ee7f230e --- /dev/null +++ b/internal/testutil/testutil.go @@ -0,0 +1,32 @@ +package testutil + +import ( + "testing" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + apiv1 "github.com/Azure/eno/api/v1" +) + +func NewClient(t testing.TB) client.Client { + return NewClientWithInterceptors(t, nil) +} + +func NewClientWithInterceptors(t testing.TB, ict *interceptor.Funcs) client.Client { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + + builder := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&apiv1.ResourceSlice{}) + + if ict != nil { + builder.WithInterceptorFuncs(*ict) + } + + return builder.Build() +}