diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender.go b/pkg/kv/kvserver/rangefeed/buffered_sender.go index edf3494da24..7965ff7b945 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender.go @@ -7,10 +7,12 @@ package rangefeed import ( "context" + "math" "sync" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -18,6 +20,9 @@ import ( "github.com/cockroachdb/errors" ) +var bufferedSenderCapacity = envutil.EnvOrDefaultInt64( + "COCKROACH_BUFFERED_STREAM_CAPACITY_PER_STREAM", math.MaxInt64) + // ┌─────────────────────────────────────────┐ MuxRangefeedEvent // │ Node.MuxRangeFeed │◄──────────────────────────────────────────────────┐ // └─────────────────┬───▲───────────────────┘ ▲ │ @@ -82,8 +87,10 @@ type BufferedSender struct { queueMu struct { syncutil.Mutex - stopped bool - buffer *eventQueue + stopped bool + capacity int64 + buffer *eventQueue + overflow bool } // Unblocking channel to notify the BufferedSender.run goroutine that there @@ -98,6 +105,7 @@ func NewBufferedSender( metrics: metrics, } bs.queueMu.buffer = newEventQueue() + bs.queueMu.capacity = bufferedSenderCapacity bs.notifyDataC = make(chan struct{}, 1) return bs } @@ -117,6 +125,14 @@ func (bs *BufferedSender) SendBuffered( log.Errorf(context.Background(), "stream sender is stopped") return errors.New("stream sender is stopped") } + if bs.queueMu.overflow { + log.Error(context.Background(), "buffer capacity exceeded") + return newRetryErrBufferCapacityExceeded() + } + if bs.queueMu.buffer.Len() >= bs.queueMu.capacity { + bs.queueMu.overflow = true + return newRetryErrBufferCapacityExceeded() + } alloc.Use(context.Background()) bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc}) select { @@ -224,7 +240,7 @@ func (bs *BufferedSender) run(ctx context.Context, stopper *stop.Stopper) error return nil case <-bs.notifyDataC: for { - e, success := bs.popFront() + e, success, overflowed, remains := bs.popFront() if success { err := bs.sender.Send(e.ev) e.alloc.Release(ctx) @@ -241,6 +257,9 @@ func (bs *BufferedSender) run(ctx context.Context, stopper *stop.Stopper) error if err != nil { return err } + if overflowed && remains == int64(0) { + return newRetryErrBufferCapacityExceeded() + } } else { break } @@ -249,11 +268,16 @@ func (bs *BufferedSender) run(ctx context.Context, stopper *stop.Stopper) error } } -func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) { +func (bs *BufferedSender) popFront() ( + e sharedMuxEvent, + success bool, + overflowed bool, + remains int64, +) { bs.queueMu.Lock() defer bs.queueMu.Unlock() event, ok := bs.queueMu.buffer.popFront() - return event, ok + return event, ok, bs.queueMu.overflow, bs.queueMu.buffer.Len() } func (bs *BufferedSender) Start(ctx context.Context, stopper *stop.Stopper) error { diff --git a/pkg/server/node.go b/pkg/server/node.go index 634699c835d..efd5f347e72 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -2076,7 +2076,7 @@ func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error { var sm streamManager if kvserver.RangefeedUseBufferedSender.Get(&n.storeCfg.Settings.SV) { sm = rangefeed.NewBufferedSender(lockedMuxStream, n.metrics) - log.Fatalf(ctx, "unimplemented: buffered sender for rangefeed #126560") + log.Infof(ctx, "using buffered sender for rangefeed") } else { sm = rangefeed.NewUnbufferedSender(lockedMuxStream, n.metrics) }