Skip to content

Commit

Permalink
kvserver/rangefeed: add capacity to node level buffered sender
Browse files Browse the repository at this point in the history
This patch adds capacity to node level buffered sender which will shut down all
registrations if the node level buffer had overflowed.

Part of: #129813
Release note: none

TODO: add a larger test at the kvclient side to make sure error returned here is treated as a restart signal

# Conflicts:
#	pkg/kv/kvserver/rangefeed/buffered_sender.go
#	pkg/kv/kvserver/rangefeed/buffered_sender_test.go

# Conflicts:
#	pkg/kv/kvserver/rangefeed/buffered_sender_test.go
  • Loading branch information
wenyihu6 committed Oct 17, 2024
1 parent b84a21e commit 5bf9eb9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
34 changes: 29 additions & 5 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ package rangefeed

import (
"context"
"math"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

var bufferedSenderCapacity = envutil.EnvOrDefaultInt64(
"COCKROACH_BUFFERED_STREAM_CAPACITY_PER_STREAM", math.MaxInt64)

// ┌─────────────────────────────────────────┐ MuxRangefeedEvent
// │ Node.MuxRangeFeed │◄──────────────────────────────────────────────────┐
// └─────────────────┬───▲───────────────────┘ ▲ │
Expand Down Expand Up @@ -82,8 +87,10 @@ type BufferedSender struct {

queueMu struct {
syncutil.Mutex
stopped bool
buffer *eventQueue
stopped bool
capacity int64
buffer *eventQueue
overflow bool
}

// Unblocking channel to notify the BufferedSender.run goroutine that there
Expand All @@ -98,6 +105,7 @@ func NewBufferedSender(
metrics: metrics,
}
bs.queueMu.buffer = newEventQueue()
bs.queueMu.capacity = bufferedSenderCapacity
bs.notifyDataC = make(chan struct{}, 1)
return bs
}
Expand All @@ -117,6 +125,14 @@ func (bs *BufferedSender) SendBuffered(
log.Errorf(context.Background(), "stream sender is stopped")
return errors.New("stream sender is stopped")
}
if bs.queueMu.overflow {
log.Error(context.Background(), "buffer capacity exceeded")
return newRetryErrBufferCapacityExceeded()
}
if bs.queueMu.buffer.Len() >= bs.queueMu.capacity {
bs.queueMu.overflow = true
return newRetryErrBufferCapacityExceeded()
}
alloc.Use(context.Background())
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
select {
Expand Down Expand Up @@ -224,7 +240,7 @@ func (bs *BufferedSender) run(ctx context.Context, stopper *stop.Stopper) error
return nil
case <-bs.notifyDataC:
for {
e, success := bs.popFront()
e, success, overflowed, remains := bs.popFront()
if success {
err := bs.sender.Send(e.ev)
e.alloc.Release(ctx)
Expand All @@ -241,6 +257,9 @@ func (bs *BufferedSender) run(ctx context.Context, stopper *stop.Stopper) error
if err != nil {
return err
}
if overflowed && remains == int64(0) {
return newRetryErrBufferCapacityExceeded()
}
} else {
break
}
Expand All @@ -249,11 +268,16 @@ func (bs *BufferedSender) run(ctx context.Context, stopper *stop.Stopper) error
}
}

func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
func (bs *BufferedSender) popFront() (
e sharedMuxEvent,
success bool,
overflowed bool,
remains int64,
) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
event, ok := bs.queueMu.buffer.popFront()
return event, ok
return event, ok, bs.queueMu.overflow, bs.queueMu.buffer.Len()
}

func (bs *BufferedSender) Start(ctx context.Context, stopper *stop.Stopper) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error {
var sm streamManager
if kvserver.RangefeedUseBufferedSender.Get(&n.storeCfg.Settings.SV) {
sm = rangefeed.NewBufferedSender(lockedMuxStream, n.metrics)
log.Fatalf(ctx, "unimplemented: buffered sender for rangefeed #126560")
log.Infof(ctx, "using buffered sender for rangefeed")
} else {
sm = rangefeed.NewUnbufferedSender(lockedMuxStream, n.metrics)
}
Expand Down

0 comments on commit 5bf9eb9

Please sign in to comment.