From 6f3ec6b833238e3221510e4ce445a46ddc5a145f Mon Sep 17 00:00:00 2001 From: Jordan Olshevski Date: Fri, 1 Nov 2024 14:19:04 -0500 Subject: [PATCH] Retry batched resource slice writes more aggressively (#240) It doesn't make sense to wait the full batch interval before retrying errors since it's unlikely that the full cost of the request was paid by apiserver if it returned an error. Essentially, the value of the batch interval is reducing load at the cost of latency. But in most cases where apiserver returns an error we don't get the benefit of reduced load, only the cost of high latency. We can improve tail latency by just using reasonable exponential backoff for the first few retries, then falling back to the batch interval after that for safety. Since the write buffer forget items on success the backoff logic only applies to errors, not requeues caused by changes enqueued concurrent with the last batch. Also adds some tooling to help uncover issues like this in the future - optional error injection at the client level. --------- Co-authored-by: Jordan Olshevski --- internal/flowcontrol/writebuffer.go | 44 +++++++++++++- internal/flowcontrol/writebuffer_test.go | 23 +++++++ internal/manager/manager.go | 76 ++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 1 deletion(-) diff --git a/internal/flowcontrol/writebuffer.go b/internal/flowcontrol/writebuffer.go index 75661036..229421de 100644 --- a/internal/flowcontrol/writebuffer.go +++ b/internal/flowcontrol/writebuffer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math" "sync" "time" @@ -49,7 +50,7 @@ func NewResourceSliceWriteBuffer(cli client.Client, batchInterval time.Duration, client: cli, state: make(map[types.NamespacedName][]*resourceSliceStatusUpdate), queue: workqueue.NewRateLimitingQueueWithConfig( - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Every(batchInterval), burst)}, + newRateLimiter(batchInterval, burst), workqueue.RateLimitingQueueConfig{ Name: "writeBuffer", }), @@ -207,3 +208,44 @@ type jsonPatch struct { Path string `json:"path"` Value any `json:"value"` } + +type rateLimiter struct { + failuresLock sync.Mutex + failures map[interface{}]int + limiter *rate.Limiter +} + +func newRateLimiter(batchInterval time.Duration, burst int) workqueue.RateLimiter { + return &rateLimiter{ + failures: map[interface{}]int{}, + limiter: rate.NewLimiter(rate.Every(batchInterval), burst), + } +} + +func (r *rateLimiter) When(item any) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + failures := r.failures[item] + r.failures[item]++ + + // Retry quickly a few times using exponential backoff + if failures > 0 && failures < 5 { + return time.Duration(10*math.Pow(2, float64(failures-1))) * time.Millisecond + } + + // Non-error batching interval + return r.limiter.Reserve().Delay() +} + +func (r *rateLimiter) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + return r.failures[item] +} + +func (r *rateLimiter) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + delete(r.failures, item) +} diff --git a/internal/flowcontrol/writebuffer_test.go b/internal/flowcontrol/writebuffer_test.go index 71d0d675..e923e158 100644 --- a/internal/flowcontrol/writebuffer_test.go +++ b/internal/flowcontrol/writebuffer_test.go @@ -220,3 +220,26 @@ func setReconciled() StatusPatchFn { return &apiv1.ResourceState{Reconciled: true} } } + +func TestRateLimiter(t *testing.T) { + r := newRateLimiter(time.Second, 1) + + r.When(234) // purge the first token + + // The first attempt of this item should wait for roughly the batching interval + wait := r.When(123) + assert.Less(t, wait, 2*time.Second) + assert.Greater(t, wait, 600*time.Millisecond) + + // A few retries use exponential backoff + for i := 0; i < 4; i++ { + wait = r.When(123) + assert.Less(t, wait, 100*time.Millisecond, "attempt %d", i) + assert.Greater(t, wait, 5*time.Millisecond, "attempt %d", i) + } + + // Eventually we fall back to the batching interval + wait = r.When(123) + assert.Less(t, wait, 2*time.Second) + assert.Greater(t, wait, 600*time.Millisecond) +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 5d5729f9..7a6b07fa 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -2,8 +2,11 @@ package manager import ( "context" + "errors" "fmt" + "math/rand/v2" "os" + "strconv" "net/http" _ "net/http/pprof" @@ -13,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" @@ -100,6 +104,22 @@ func newMgr(logger logr.Logger, opts *Options, isController, isReconciler bool) LeaderElectionReleaseOnCancel: true, } + if ratioStr := os.Getenv("CHAOS_RATIO"); ratioStr != "" { + mgrOpts.NewClient = func(config *rest.Config, options client.Options) (client.Client, error) { + base, err := client.New(config, options) + if err != nil { + return nil, err + } + + ratio, err := strconv.ParseFloat(ratioStr, 64) + if err != nil { + return nil, err + } + + return &chaosClient{Client: base, ratio: ratio}, nil + } + } + if isController { // Only cache pods in the synthesizer pod namespace and owned by this controller mgrOpts.Cache.ByObject[&corev1.Pod{}] = cache.ByObject{ @@ -216,3 +236,59 @@ func SingleEventHandler() handler.EventHandler { return []reconcile.Request{{}} })) } + +type chaosClient struct { + client.Client + ratio float64 +} + +func (c *chaosClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if c.ratio > rand.Float64() { + return errors.New("chaos!") + } + return c.Client.Create(ctx, obj, opts...) +} + +func (c *chaosClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + if c.ratio > rand.Float64() { + return errors.New("chaos!") + } + return c.Client.Delete(ctx, obj, opts...) +} + +func (c *chaosClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if c.ratio > rand.Float64() { + return errors.New("chaos!") + } + return c.Client.Update(ctx, obj, opts...) +} + +func (c *chaosClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + if c.ratio > rand.Float64() { + return errors.New("chaos!") + } + return c.Client.Patch(ctx, obj, patch, opts...) +} + +func (c *chaosClient) Status() client.SubResourceWriter { + return &chaosStatusClient{SubResourceWriter: c.Client.Status(), parent: c} +} + +type chaosStatusClient struct { + client.SubResourceWriter + parent *chaosClient +} + +func (c *chaosStatusClient) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { + if c.parent.ratio > rand.Float64() { + return errors.New("chaos!") + } + return c.SubResourceWriter.Update(ctx, obj, opts...) +} + +func (c *chaosStatusClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { + if c.parent.ratio > rand.Float64() { + return errors.New("chaos!") + } + return c.SubResourceWriter.Patch(ctx, obj, patch, opts...) +}