Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement writebuffer #8

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/unit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Go Unit Tests
on: [push]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'
- name: Test
run: go test -v ./...

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/Azure/eno

go 1.20
go 1.21

require (
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
15 changes: 15 additions & 0 deletions internal/reconstitution/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package reconstitution

import (
"k8s.io/apimachinery/pkg/types"

apiv1 "github.com/Azure/eno/api/v1"
)

type StatusPatchFn func(*apiv1.ResourceState) bool

// ManifestRef references a particular resource manifest within a resource slice.
type ManifestRef struct {
Slice types.NamespacedName
Index int // position of this manifest within the slice
}
158 changes: 158 additions & 0 deletions internal/reconstitution/writebuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package reconstitution

import (
"context"
"fmt"
"sync"
"time"

"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/Azure/eno/api/v1"
"github.com/go-logr/logr"
)

type asyncStatusUpdate struct {
SlicedResource *ManifestRef
PatchFn StatusPatchFn
}

// writeBuffer reduces load on etcd/apiserver by collecting resource slice status
// updates over a short period of time and applying them in a single update request.
type writeBuffer struct {
client client.Client
logger logr.Logger
jveski marked this conversation as resolved.
Show resolved Hide resolved

// queue items are per-slice.
// the state map collects multiple updates per slice to be dispatched by next queue item.
mut sync.Mutex
state map[types.NamespacedName][]*asyncStatusUpdate
queue workqueue.RateLimitingInterface
}

func newWriteBuffer(cli client.Client, logger logr.Logger, batchInterval time.Duration, burst int) *writeBuffer {
return &writeBuffer{
client: cli,
logger: logger.WithValues("controller", "writeBuffer"),
state: make(map[types.NamespacedName][]*asyncStatusUpdate),
queue: workqueue.NewRateLimitingQueueWithConfig(
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Every(batchInterval), burst)},
workqueue.RateLimitingQueueConfig{
Name: "writeBuffer",
}),
}
}

func (w *writeBuffer) PatchStatusAsync(ctx context.Context, ref *ManifestRef, patchFn StatusPatchFn) {
w.mut.Lock()
defer w.mut.Unlock()

logr.FromContextOrDiscard(ctx).V(1).Info("buffering status update")

key := ref.Slice
// TODO(jordan): Consider de-duping this slice to avoid potentially allocating a lot of memory if some bug causes churning of the control loop that ends up calling this.
w.state[key] = append(w.state[key], &asyncStatusUpdate{
jveski marked this conversation as resolved.
Show resolved Hide resolved
SlicedResource: ref,
PatchFn: patchFn,
})
w.queue.AddRateLimited(key)
}

func (w *writeBuffer) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
w.queue.ShutDown()
}()
for w.processQueueItem(ctx) {
}
return nil
}

func (w *writeBuffer) processQueueItem(ctx context.Context) bool {
item, shutdown := w.queue.Get()
if shutdown {
AYM1607 marked this conversation as resolved.
Show resolved Hide resolved
return false
}
defer w.queue.Done(item)
sliceNSN := item.(types.NamespacedName)

w.mut.Lock()
updates := w.state[sliceNSN]
delete(w.state, sliceNSN)
w.mut.Unlock()

logger := w.logger.WithValues("slice", sliceNSN)
ctx = logr.NewContext(ctx, logger)

if len(updates) == 0 {
logger.V(0).Info("dropping queue item because no updates were found for this slice (this is suspicious)")
jveski marked this conversation as resolved.
Show resolved Hide resolved
w.queue.Forget(item)
jveski marked this conversation as resolved.
Show resolved Hide resolved
return true
}

if w.updateSlice(ctx, sliceNSN, updates) {
w.queue.Forget(item)
return true
}

// Put the updates back in the buffer to retry on the next attempt
logger.V(1).Info("update failed - adding updates back to the buffer")
w.mut.Lock()
w.state[sliceNSN] = append(w.state[sliceNSN], updates...)
w.mut.Unlock()
w.queue.Forget(item)
w.queue.AddRateLimited(item)
jveski marked this conversation as resolved.
Show resolved Hide resolved

return true
}

func (w *writeBuffer) updateSlice(ctx context.Context, sliceNSN types.NamespacedName, updates []*asyncStatusUpdate) bool {
logger := logr.FromContextOrDiscard(ctx)
logger.V(1).Info("starting to update slice status")

slice := &apiv1.ResourceSlice{}
err := w.client.Get(ctx, sliceNSN, slice)
if errors.IsNotFound(err) {
logger.V(0).Info("slice has been deleted, skipping status update")
return true
}
if err != nil {
logger.Error(err, "unable to get resource slice")
return false
}

if len(slice.Status.Resources) != len(slice.Spec.Resources) {
logger.V(1).Info("allocating resource status slice")
slice.Status.Resources = make([]apiv1.ResourceState, len(slice.Spec.Resources))
}

var dirty bool
for _, update := range updates {
logger := logger.WithValues("slicedResource", update.SlicedResource)
statusPtr := &slice.Status.Resources[update.SlicedResource.Index]

if update.PatchFn(statusPtr) {
logger.V(1).Info("patch caused status to change")
dirty = true
} else {
logger.V(1).Info("patch did not cause status to change")
}
}
if !dirty {
logger.V(1).Info("no status updates were necessary")
return true
}

err = w.client.Status().Update(ctx, slice)
if err != nil {
logger.Error(err, "unable to update resource slice")
return false
}

logger.V(0).Info(fmt.Sprintf("updated the status of %d resources in slice", len(updates)))
return true
}
193 changes: 193 additions & 0 deletions internal/reconstitution/writebuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package reconstitution

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

apiv1 "github.com/Azure/eno/api/v1"
"github.com/Azure/eno/internal/testutil"
)

func TestWriteBufferBasics(t *testing.T) {
ctx := context.Background()
cli := testutil.NewClient(t)
w := newWriteBuffer(cli, testr.New(t), 0, 1)

// One resource slice w/ len of 3
slice := &apiv1.ResourceSlice{}
slice.Name = "test-slice-1"
slice.Spec.Resources = make([]apiv1.Manifest, 3)
require.NoError(t, cli.Create(ctx, slice))

// One of the resources has been reconciled
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, setReconciled())

// Slice resource's status should reflect the patch
w.processQueueItem(ctx)
require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(slice), slice))
require.Len(t, slice.Status.Resources, 3)
assert.False(t, slice.Status.Resources[0].Reconciled)
assert.True(t, slice.Status.Resources[1].Reconciled)
assert.False(t, slice.Status.Resources[2].Reconciled)

// All state has been flushed
assert.Len(t, w.state, 0)
assert.Equal(t, 0, w.queue.Len())
}

func TestWriteBufferBatching(t *testing.T) {
ctx := context.Background()
var updateCalls atomic.Int32
cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{
SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
updateCalls.Add(1)
return client.SubResource(subResourceName).Update(ctx, obj, opts...)
},
})
w := newWriteBuffer(cli, testr.New(t), time.Millisecond*2, 1)

// One resource slice w/ len of 3
slice := &apiv1.ResourceSlice{}
slice.Name = "test-slice-1"
slice.Spec.Resources = make([]apiv1.Manifest, 3)
require.NoError(t, cli.Create(ctx, slice))

// Two of the resources have been reconciled within the batch interval
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, setReconciled())

req = &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 2
w.PatchStatusAsync(ctx, req, setReconciled())

// Slice resource's status should be correct after a single update
w.processQueueItem(ctx)
assert.Equal(t, int32(1), updateCalls.Load())
require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(slice), slice))
require.Len(t, slice.Status.Resources, 3)
assert.False(t, slice.Status.Resources[0].Reconciled)
assert.True(t, slice.Status.Resources[1].Reconciled)
assert.True(t, slice.Status.Resources[2].Reconciled)
}

func TestWriteBufferNoUpdates(t *testing.T) {
ctx := context.Background()
cli := testutil.NewClient(t)
w := newWriteBuffer(cli, testr.New(t), 0, 1)

// One resource slice w/ len of 3
slice := &apiv1.ResourceSlice{}
slice.Name = "test-slice-1"
slice.Spec.Resources = make([]apiv1.Manifest, 3)
require.NoError(t, cli.Create(ctx, slice))

// One of the resources has been reconciled
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, setReconciled())

// Remove the update leaving the queue message in place
w.state = map[types.NamespacedName][]*asyncStatusUpdate{}

// Slice's status should not have been initialized
w.processQueueItem(ctx)
require.NoError(t, cli.Get(ctx, client.ObjectKeyFromObject(slice), slice))
require.Len(t, slice.Status.Resources, 0)
}

func TestWriteBufferMissingSlice(t *testing.T) {
ctx := context.Background()
cli := testutil.NewClient(t)
w := newWriteBuffer(cli, testr.New(t), 0, 1)

req := &ManifestRef{}
req.Slice.Name = "test-slice-1" // this doesn't exist
w.PatchStatusAsync(ctx, req, setReconciled())

// Slice 404 drops the event and does not retry.
// Prevents a deadlock of this queue item.
w.processQueueItem(ctx)
assert.Equal(t, 0, w.queue.Len())
}

func TestWriteBufferNoChange(t *testing.T) {
ctx := context.Background()
cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{
SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
t.Fatal("should not have sent any status updates")
return nil
},
})
w := newWriteBuffer(cli, testr.New(t), 0, 1)

// One resource slice
slice := &apiv1.ResourceSlice{}
slice.Name = "test-slice-1"
slice.Spec.Resources = make([]apiv1.Manifest, 3)
slice.Status.Resources = make([]apiv1.ResourceState, 3)
slice.Status.Resources[1].Reconciled = true // already accounted for
require.NoError(t, cli.Create(ctx, slice))

// One of the resources has been reconciled
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, setReconciled())

w.processQueueItem(ctx)
}

func TestWriteBufferUpdateError(t *testing.T) {
ctx := context.Background()
cli := testutil.NewClientWithInterceptors(t, &interceptor.Funcs{
SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
return errors.New("could be any error")
},
})
w := newWriteBuffer(cli, testr.New(t), 0, 1)

// One resource slice w/ len of 3
slice := &apiv1.ResourceSlice{}
slice.Name = "test-slice-1"
slice.Spec.Resources = make([]apiv1.Manifest, 3)
require.NoError(t, cli.Create(ctx, slice))

// One of the resources has been reconciled
req := &ManifestRef{}
req.Slice.Name = "test-slice-1"
req.Index = 1
w.PatchStatusAsync(ctx, req, setReconciled())

// Both the queue item and state have persisted
w.processQueueItem(ctx)
key := types.NamespacedName{Name: slice.Name}
assert.Len(t, w.state[key], 1)
assert.Equal(t, 1, w.queue.Len())
}

func setReconciled() StatusPatchFn {
return func(rs *apiv1.ResourceState) bool {
if rs.Reconciled {
return false // already set
}
rs.Reconciled = true
return true
}
}
Loading