Skip to content

Commit

Permalink
Retry batched resource slice writes more aggressively (#240)
Browse files Browse the repository at this point in the history
It doesn't make sense to wait the full batch interval before retrying
errors since it's unlikely that the full cost of the request was paid by
apiserver if it returned an error. Essentially, the value of the batch
interval is reducing load at the cost of latency. But in most cases
where apiserver returns an error we don't get the benefit of reduced
load, only the cost of high latency.

We can improve tail latency by just using reasonable exponential backoff
for the first few retries, then falling back to the batch interval after
that for safety. Since the write buffer forget items on success the
backoff logic only applies to errors, not requeues caused by changes
enqueued concurrent with the last batch.

Also adds some tooling to help uncover issues like this in the future -
optional error injection at the client level.

---------

Co-authored-by: Jordan Olshevski <[email protected]>
  • Loading branch information
jveski and Jordan Olshevski authored Nov 1, 2024
1 parent 0f9ca70 commit 6f3ec6b
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 1 deletion.
44 changes: 43 additions & 1 deletion internal/flowcontrol/writebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -49,7 +50,7 @@ func NewResourceSliceWriteBuffer(cli client.Client, batchInterval time.Duration,
client: cli,
state: make(map[types.NamespacedName][]*resourceSliceStatusUpdate),
queue: workqueue.NewRateLimitingQueueWithConfig(
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Every(batchInterval), burst)},
newRateLimiter(batchInterval, burst),
workqueue.RateLimitingQueueConfig{
Name: "writeBuffer",
}),
Expand Down Expand Up @@ -207,3 +208,44 @@ type jsonPatch struct {
Path string `json:"path"`
Value any `json:"value"`
}

type rateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
limiter *rate.Limiter
}

func newRateLimiter(batchInterval time.Duration, burst int) workqueue.RateLimiter {
return &rateLimiter{
failures: map[interface{}]int{},
limiter: rate.NewLimiter(rate.Every(batchInterval), burst),
}
}

func (r *rateLimiter) When(item any) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

failures := r.failures[item]
r.failures[item]++

// Retry quickly a few times using exponential backoff
if failures > 0 && failures < 5 {
return time.Duration(10*math.Pow(2, float64(failures-1))) * time.Millisecond
}

// Non-error batching interval
return r.limiter.Reserve().Delay()
}

func (r *rateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}

func (r *rateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
23 changes: 23 additions & 0 deletions internal/flowcontrol/writebuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,26 @@ func setReconciled() StatusPatchFn {
return &apiv1.ResourceState{Reconciled: true}
}
}

func TestRateLimiter(t *testing.T) {
r := newRateLimiter(time.Second, 1)

r.When(234) // purge the first token

// The first attempt of this item should wait for roughly the batching interval
wait := r.When(123)
assert.Less(t, wait, 2*time.Second)
assert.Greater(t, wait, 600*time.Millisecond)

// A few retries use exponential backoff
for i := 0; i < 4; i++ {
wait = r.When(123)
assert.Less(t, wait, 100*time.Millisecond, "attempt %d", i)
assert.Greater(t, wait, 5*time.Millisecond, "attempt %d", i)
}

// Eventually we fall back to the batching interval
wait = r.When(123)
assert.Less(t, wait, 2*time.Second)
assert.Greater(t, wait, 600*time.Millisecond)
}
76 changes: 76 additions & 0 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package manager

import (
"context"
"errors"
"fmt"
"math/rand/v2"
"os"
"strconv"

"net/http"
_ "net/http/pprof"
Expand All @@ -13,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -100,6 +104,22 @@ func newMgr(logger logr.Logger, opts *Options, isController, isReconciler bool)
LeaderElectionReleaseOnCancel: true,
}

if ratioStr := os.Getenv("CHAOS_RATIO"); ratioStr != "" {
mgrOpts.NewClient = func(config *rest.Config, options client.Options) (client.Client, error) {
base, err := client.New(config, options)
if err != nil {
return nil, err
}

ratio, err := strconv.ParseFloat(ratioStr, 64)
if err != nil {
return nil, err
}

return &chaosClient{Client: base, ratio: ratio}, nil
}
}

if isController {
// Only cache pods in the synthesizer pod namespace and owned by this controller
mgrOpts.Cache.ByObject[&corev1.Pod{}] = cache.ByObject{
Expand Down Expand Up @@ -216,3 +236,59 @@ func SingleEventHandler() handler.EventHandler {
return []reconcile.Request{{}}
}))
}

type chaosClient struct {
client.Client
ratio float64
}

func (c *chaosClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Create(ctx, obj, opts...)
}

func (c *chaosClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Delete(ctx, obj, opts...)
}

func (c *chaosClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Update(ctx, obj, opts...)
}

func (c *chaosClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
if c.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.Client.Patch(ctx, obj, patch, opts...)
}

func (c *chaosClient) Status() client.SubResourceWriter {
return &chaosStatusClient{SubResourceWriter: c.Client.Status(), parent: c}
}

type chaosStatusClient struct {
client.SubResourceWriter
parent *chaosClient
}

func (c *chaosStatusClient) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
if c.parent.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.SubResourceWriter.Update(ctx, obj, opts...)
}

func (c *chaosStatusClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
if c.parent.ratio > rand.Float64() {
return errors.New("chaos!")
}
return c.SubResourceWriter.Patch(ctx, obj, patch, opts...)
}

0 comments on commit 6f3ec6b

Please sign in to comment.