diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index b68124ef5ad9..6582773dfd46 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -34,8 +34,10 @@ import ( // range at the leader. It must be created for a particular leader term, and // closed if the term changes. // -// None of the methods are called with Replica.mu held. The caller and callee -// should order their mutexes before Replica.mu. +// Almost none of the methods are called with Replica.mu held. The caller and +// callee should order their mutexes before Replica.mu. The one exception is +// HoldsSendTokensLocked, which holds both raftMu and Replica.mu. The callee +// must not acquire its own mutex. // // RangeController dynamically switches between push and pull mode based on // RaftEvent handling. In general, the code here is oblivious to the fact that @@ -80,11 +82,11 @@ type RangeController interface { // // Requires replica.raftMu to be held. MaybeSendPingsRaftMuLocked() - // HoldsSendTokensRaftMuLocked returns true if the replica holds any send - // tokens. Used to prevent replica quiescence. + // HoldsSendTokensLocked returns true if the replica holds any send tokens. + // Used to prevent replica quiescence. // - // Requires replica.raftMu to be held. - HoldsSendTokensRaftMuLocked() bool + // Requires replica.raftMu and replica.mu to be held. + HoldsSendTokensLocked() bool // SetReplicasRaftMuLocked sets the replicas of the range. The caller will // never mutate replicas, and neither should the callee. // @@ -477,7 +479,7 @@ type ProbeToCloseTimerScheduler interface { // // HandleRaftEventRaftMuLocked(ctx, RaftEvent{}) // - // Which will trigger handleReadyState to close the send stream if it hasn't + // Which will trigger handleReadyStateRaftMuLocked to close the send stream if it hasn't // transitioned to StateReplicate. // // Requires replica.raftMu to be held. @@ -1002,18 +1004,18 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // Leader is always in push mode. mode = MsgAppPush } - existingSSState := rs.getExistingSendStreamState() + existingSSState := rs.getExistingSendStreamStateRaftMuLocked() rs.scratchEvent, rs.scratchSendingEntries = constructRaftEventForReplica( ctx, mode, appendState, info, existingSSState, msgApps, e.LogSnapshot, rs.scratchSendingEntries) info = rs.scratchEvent.replicaStateInfo } - shouldWaitChange = rs.handleReadyState( + shouldWaitChange = rs.handleReadyStateRaftMuLocked( ctx, mode, info, rs.scratchEvent.nextRaftIndex, rs.scratchEvent.recreateSendStream) || shouldWaitChange if e.MsgAppMode == MsgAppPull && rs.desc.IsAnyVoter() { // Compute state and first-pass decision on force-flushing and sending // the new entries. - rs.scratchVoterStreamState = rs.computeReplicaStreamState(ctx, needsTokens) + rs.scratchVoterStreamState = rs.computeReplicaStreamStateRaftMuLocked(ctx, needsTokens) if (rs.scratchVoterStreamState.noSendQ && rs.scratchVoterStreamState.hasSendTokens) || rs.scratchVoterStreamState.forceFlushing { if rs.desc.IsVoterOldConfig() { @@ -1073,7 +1075,7 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra } else { // Only need to do first-pass computation for non-voters, since // there is no adjustment needed to ensure quorum. - ss = rs.computeReplicaStreamState(ctx, needsTokens) + ss = rs.computeReplicaStreamStateRaftMuLocked(ctx, needsTokens) } rd = replicaDirective{ forceFlush: ss.forceFlushing, @@ -1081,7 +1083,7 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra preventSendQNoForceFlush: ss.preventSendQNoForceFlush, } } - shouldWaitChange = rs.handleReadyEntries(ctx, rs.scratchEvent, rd) || shouldWaitChange + shouldWaitChange = rs.handleReadyEntriesRaftMuLocked(ctx, rs.scratchEvent, rd) || shouldWaitChange } // If there was a quorum change, update the voter sets, triggering the @@ -1318,7 +1320,7 @@ func (rc *rangeController) AdmitRaftMuLocked( ctx context.Context, replicaID roachpb.ReplicaID, av AdmittedVector, ) { if rs, ok := rc.replicaMap[replicaID]; ok { - rs.admit(ctx, av) + rs.admitRaftMuLocked(ctx, av) } } @@ -1328,19 +1330,19 @@ func (rc *rangeController) MaybeSendPingsRaftMuLocked() { if id == rc.opts.LocalReplicaID { continue } - if s := state.sendStream; s != nil && s.holdsTokens() { + if s := state.sendStream; s != nil && s.holdsTokensRaftMuLocked() { rc.opts.RaftInterface.SendPingRaftMuLocked(id) } } } -// HoldsSendTokensRaftMuLocked implements RangeController. -func (rc *rangeController) HoldsSendTokensRaftMuLocked() bool { +// HoldsSendTokensLocked implements RangeController. +func (rc *rangeController) HoldsSendTokensLocked() bool { // TODO(pav-kv): we are doing the same checks in MaybeSendPingsRaftMuLocked // here, and both are called from Replica.tick. We can optimize this, and do // both in one method. for _, state := range rc.replicaMap { - if s := state.sendStream; s != nil && s.holdsTokens() { + if s := state.sendStream; s != nil && s.holdsTokensRaftMuLocked() { return true } } @@ -1393,7 +1395,7 @@ func (rc *rangeController) CloseRaftMuLocked(ctx context.Context) { // AdmittedVector updates. for _, rs := range rc.replicaMap { if rs.sendStream != nil { - rs.closeSendStream(ctx) + rs.closeSendStreamRaftMuLocked(ctx) } } rc.opts.RangeControllerMetrics.Count.Dec(1) @@ -1415,7 +1417,7 @@ func (rc *rangeController) InspectRaftMuLocked(ctx context.Context) kvflowinspec defer rs.sendStream.mu.Unlock() streams = append(streams, kvflowinspectpb.ConnectedStream{ Stream: rc.opts.SSTokenCounter.InspectStream(rs.stream), - TrackedDeductions: rs.sendStream.mu.tracker.Inspect(), + TrackedDeductions: rs.sendStream.raftMu.tracker.Inspect(), }) }() } @@ -1498,8 +1500,8 @@ func (rc *rangeController) updateSendQueueStatsRaftMuLocked(now time.Time) { func() { rs.sendStream.mu.Lock() defer rs.sendStream.mu.Unlock() - stats.SendQueueBytes = int64(rs.sendStream.approxQueueSizeLocked()) - stats.SendQueueCount = rs.sendStream.queueLengthLocked() + stats.SendQueueBytes = int64(rs.sendStream.approxQueueSizeStreamLocked()) + stats.SendQueueCount = rs.sendStream.queueLengthRaftMuAndStreamLocked() }() } rc.lastSendQueueStatsScratch.Set(stats) @@ -1522,7 +1524,7 @@ func (rc *rangeController) updateReplicaSetRaftMuLocked(ctx context.Context, new if rs := rc.replicaMap[r]; rs.sendStream != nil { // The replica is no longer part of the range, so we don't expect any // tracked token deductions to be returned. Return them now. - rs.closeSendStream(ctx) + rs.closeSendStreamRaftMuLocked(ctx) } delete(rc.replicaMap, r) remove = append(remove, r) @@ -1585,7 +1587,7 @@ func (rc *rangeController) updateWaiterSetsRaftMuLocked() { isNew := r.IsVoterNewConfig() rs := rc.replicaMap[r.ReplicaID] - isStateReplicate, hasSendQ := rs.isStateReplicateAndSendQ() + isStateReplicate, hasSendQ := rs.isStateReplicateAndSendQRaftMuLocked() waiterState := stateForWaiters{ replicaID: r.ReplicaID, isStateReplicate: isStateReplicate, @@ -1676,7 +1678,7 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) { if !ok { panic(errors.AssertionFailedf("replica %s not in replicaMap", state.replicaID)) } - isStateReplicate, hasSendQ := rs.isStateReplicateAndSendQ() + isStateReplicate, hasSendQ := rs.isStateReplicateAndSendQRaftMuLocked() if state.isStateReplicate != isStateReplicate { panic(errors.AssertionFailedf("inconsistent isStateReplicate: %t, %t", state.isStateReplicate, isStateReplicate)) @@ -1713,6 +1715,10 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) { } } +// replicaState holds state for each replica. All methods are called with +// raftMu held, hence it does not have its own mutex. +// +// TODO(sumeer): add mutex held assertions. type replicaState struct { parent *rangeController // stream aggregates across the streams for the same (tenant, store). This @@ -1789,10 +1795,22 @@ func NewReplicaState( return rs } +// replicaSendStream maintains state for a replica to which we (typically) are +// actively replicating. +// +// TODO(sumeer): assert that raftMu is held on the various methods that say it +// must be held. type replicaSendStream struct { parent *replicaState - // Mutex is ordered before Replica.mu. + // Mutex is ordered before Replica.mu. IO is done while holding the mutex. + // + // TODO(sumeer): there are a number of fields inside mu, that don't need to + // be protected by mu. Consider moving them to the raftMu struct after we + // have raftMu assertions in this file. Note that moving them will likely + // not reduce the size of a critical section or avoid needing + // replicaSendStream.mu where we currently need it. So we could potentially + // keep them under replicaSendStream.mu as a defensive mechanism. mu struct { syncutil.Mutex // connectedStateStart is the time when the connectedState was last @@ -1803,12 +1821,6 @@ type replicaSendStream struct { // nextRaftIndexInitial is the value of nextRaftIndex when this // replicaSendStream was created, or transitioned into replicate. nextRaftIndexInitial uint64 - // tracker contains entries that have been sent, and have had send-tokens - // deducted (and will have had eval-tokens deducted iff index >= - // nextRaftIndexInitial). - // - // Contains no entries in probeRecentlyNoSendQ. - tracker Tracker // Eval state. // // Contains no tokens in probeRecentlyNoSendQ. @@ -1904,22 +1916,38 @@ type replicaSendStream struct { tokenWatcherHandle SendTokenWatcherHandle deductedForSchedulerTokens kvflowcontrol.Tokens } + // TODO(sumeer): remove closed. Whenever a replicaSendStream is closed it + // is also no longer referenced by replicaState. The only motivation for + // closed is that replicaSendStream.Notify calls directly into + // replicaSendStream. But closing a send stream also sets + // replicaSendStream.mu.sendQueue.tokenWatcherHandle to empty, and Notify + // is already a noop in that case. So this field serves no useful purpose. closed bool } + // Fields that are read/written while holding raftMu. + raftMu struct { + // tracker contains entries that have been sent, and have had send-tokens + // deducted (and will have had eval-tokens deducted iff index >= + // nextRaftIndexInitial). + // + // Contains no entries in probeRecentlyNoSendQ. + tracker Tracker + } } -func (rss *replicaSendStream) changeConnectedStateLocked(state connectedState, now time.Time) { +func (rss *replicaSendStream) changeConnectedStateRaftMuAndStreamLocked( + state connectedState, now time.Time, +) { + rss.mu.AssertHeld() rss.mu.connectedState = state rss.mu.connectedStateStart = now } -func (rss *replicaSendStream) holdsTokens() bool { - rss.mu.Lock() // TODO(pav-kv): should we make it RWMutex.RLock()? - defer rss.mu.Unlock() - return !rss.mu.tracker.Empty() +func (rss *replicaSendStream) holdsTokensRaftMuLocked() bool { + return !rss.raftMu.tracker.Empty() } -func (rss *replicaSendStream) admit(ctx context.Context, av AdmittedVector) { +func (rss *replicaSendStream) admitRaftMuLocked(ctx context.Context, av AdmittedVector) { if log.V(2) { log.VInfof(ctx, 2, "r%v:%v stream %v admit %v", rss.parent.parent.opts.RangeID, rss.parent.desc, rss.parent.stream, av) @@ -1927,13 +1955,13 @@ func (rss *replicaSendStream) admit(ctx context.Context, av AdmittedVector) { rss.mu.Lock() defer rss.mu.Unlock() - returnedSend, returnedEval := rss.mu.tracker.Untrack(av, + returnedSend, returnedEval := rss.raftMu.tracker.Untrack(av, rss.mu.nextRaftIndexInitial) - rss.returnSendTokens(ctx, returnedSend, false /* disconnect */) - rss.returnEvalTokensLocked(ctx, returnedEval) + rss.returnSendTokensRaftMuAndStreamLocked(ctx, returnedSend, false /* disconnect */) + rss.returnEvalTokensRaftMuAndStreamLocked(ctx, returnedEval) } -func (rs *replicaState) getExistingSendStreamState() existingSendStreamState { +func (rs *replicaState) getExistingSendStreamStateRaftMuLocked() existingSendStreamState { if rs.sendStream == nil { return existingSendStreamState{ existsAndInStateReplicate: false, @@ -1952,7 +1980,7 @@ func (rs *replicaState) getExistingSendStreamState() existingSendStreamState { } } -func (rs *replicaState) createReplicaSendStream( +func (rs *replicaState) createReplicaSendStreamRaftMuLocked( ctx context.Context, mode RaftMsgAppMode, indexToSend uint64, nextRaftIndex uint64, ) { // Must be in StateReplicate on creation. @@ -1963,24 +1991,25 @@ func (rs *replicaState) createReplicaSendStream( parent: rs, } rss := rs.sendStream - rss.mu.tracker.Init(rs.parent.term, rs.stream) + // NB: need to lock rss.mu due to (a) assertions in some of the methods + // called below, and (b) since + // startAttemptingToEmptySendQueueViaWatcherStreamLocked can hand a reference to + // rss to a different goroutine, which can start running immediately. + rss.mu.Lock() + defer rss.mu.Unlock() + rss.raftMu.tracker.Init(rs.parent.term, rs.stream) rss.mu.closed = false - rss.changeConnectedStateLocked(replicate, rs.parent.opts.Clock.PhysicalTime()) + rss.changeConnectedStateRaftMuAndStreamLocked(replicate, rs.parent.opts.Clock.PhysicalTime()) rss.mu.mode = mode rss.mu.nextRaftIndexInitial = nextRaftIndex rss.mu.sendQueue.indexToSend = indexToSend rss.mu.sendQueue.nextRaftIndex = nextRaftIndex - if mode == MsgAppPull && !rs.sendStream.isEmptySendQueueLocked() { - // NB: need to lock rss.mu since - // startAttemptingToEmptySendQueueViaWatcherLocked can hand a reference to - // rss to a different goroutine, which can start running immediately. - rss.mu.Lock() - defer rss.mu.Unlock() - rss.startAttemptingToEmptySendQueueViaWatcherLocked(ctx) + if mode == MsgAppPull && !rs.sendStream.isEmptySendQueueRaftMuAndStreamLocked() { + rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } } -func (rs *replicaState) isStateReplicateAndSendQ() (isStateReplicate, hasSendQ bool) { +func (rs *replicaState) isStateReplicateAndSendQRaftMuLocked() (isStateReplicate, hasSendQ bool) { if rs.sendStream == nil { return false, true } @@ -1988,7 +2017,7 @@ func (rs *replicaState) isStateReplicateAndSendQ() (isStateReplicate, hasSendQ b defer rs.sendStream.mu.Unlock() isStateReplicate = rs.sendStream.mu.connectedState == replicate if isStateReplicate { - hasSendQ = !rs.sendStream.isEmptySendQueueLocked() + hasSendQ = !rs.sendStream.isEmptySendQueueRaftMuAndStreamLocked() } else { // For WaitForEval, we treat probeRecentlyNoSendQ as having a send-queue // and not part of the quorum. We don't want to keep evaluating and pile @@ -2031,10 +2060,10 @@ func getEntryFCStateOrFatal(ctx context.Context, entry raftpb.Entry) entryFCStat } } -// computeReplicaStreamState computes the current state of the stream and a -// first-pass decision on what the stream should do. Called for all replicas -// when in pull mode. -func (rs *replicaState) computeReplicaStreamState( +// computeReplicaStreamStateRaftMuLocked computes the current state of the +// stream and a first-pass decision on what the stream should do. Called for +// all replicas when in pull mode. +func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( ctx context.Context, needsTokens [admissionpb.NumWorkClasses]bool, ) replicaStreamState { if rs.sendStream == nil { @@ -2071,7 +2100,7 @@ func (rs *replicaState) computeReplicaStreamState( } vss := replicaStreamState{ isReplicate: true, - noSendQ: rss.isEmptySendQueueLocked(), + noSendQ: rss.isEmptySendQueueRaftMuAndStreamLocked(), forceFlushing: rss.mu.sendQueue.forceFlushScheduled, preventSendQNoForceFlush: false, } @@ -2127,7 +2156,7 @@ func (rs *replicaState) computeReplicaStreamState( return vss } -func (rs *replicaState) handleReadyEntries( +func (rs *replicaState) handleReadyEntriesRaftMuLocked( ctx context.Context, eventForReplica raftEventForReplica, directive replicaDirective, ) (transitionedSendQState bool) { if rs.sendStream == nil { @@ -2140,24 +2169,24 @@ func (rs *replicaState) handleReadyEntries( return false } transitionedSendQState, err := - rs.sendStream.handleReadyEntriesLocked(ctx, eventForReplica, directive) + rs.sendStream.handleReadyEntriesRaftMuAndStreamLocked(ctx, eventForReplica, directive) if err != nil { // Transitioned to StateSnapshot, or some other error that Raft needs to // deal with. - rs.sendStream.closeLocked(ctx) + rs.sendStream.closeRaftMuAndStreamLocked(ctx) rs.sendStream = nil transitionedSendQState = true } return transitionedSendQState } -// handleReadyState handles state management for the replica based on the -// provided follower state information. If the state changes in a way that -// affects requests waiting for evaluation, returns true. mode, nextRaftIndex -// and recreateSendStream are only relevant when info.State is StateReplicate. -// mode, info.Next, nextRaftIndex are only used when recreateSendStream is -// true. -func (rs *replicaState) handleReadyState( +// handleReadyStateRaftMuLocked handles state management for the replica based +// on the provided follower state information. If the state changes in a way +// that affects requests waiting for evaluation, returns true. mode, +// nextRaftIndex and recreateSendStream are only relevant when info.State is +// StateReplicate. mode, info.Next, nextRaftIndex are only used when +// recreateSendStream is true. +func (rs *replicaState) handleReadyStateRaftMuLocked( ctx context.Context, mode RaftMsgAppMode, info ReplicaStateInfo, @@ -2181,11 +2210,11 @@ func (rs *replicaState) handleReadyState( // probeRecentlyNoSendQDuration, so close the stream. shouldClose = true } else if state != probeRecentlyNoSendQ { - if rs.sendStream.isEmptySendQueueLocked() { + if rs.sendStream.isEmptySendQueueRaftMuAndStreamLocked() { // Empty send-queue. We will transition to probeRecentlyNoSendQ, // which trades off not doing a force-flush with allowing for higher // latency to achieve quorum. - rs.sendStream.changeToProbeLocked(ctx, now) + rs.sendStream.changeToProbeRaftMuAndStreamLocked(ctx, now) } else { // Had a send-queue. shouldClose = true @@ -2196,7 +2225,7 @@ func (rs *replicaState) handleReadyState( } return shouldClose }(); shouldClose { - rs.closeSendStream(ctx) + rs.closeSendStreamRaftMuLocked(ctx) } case tracker.StateReplicate: @@ -2208,17 +2237,17 @@ func (rs *replicaState) handleReadyState( if rs.sendStream != nil && recreateSendStream { // This includes both (a) inconsistencies, and (b) transition from // probeRecentlyNoSendQ => replicate. - rs.closeSendStream(ctx) + rs.closeSendStreamRaftMuLocked(ctx) } if rs.sendStream == nil { - rs.createReplicaSendStream(ctx, mode, info.Next, nextRaftIndex) + rs.createReplicaSendStreamRaftMuLocked(ctx, mode, info.Next, nextRaftIndex) // Have stale send-queue state. shouldWaitChange = true } case tracker.StateSnapshot: if rs.sendStream != nil { - rs.closeSendStream(ctx) + rs.closeSendStreamRaftMuLocked(ctx) shouldWaitChange = true } } @@ -2243,7 +2272,7 @@ func (rs *replicaState) scheduledRaftMuLocked( // RaftEvent. return false, false } - if rss.isEmptySendQueueLocked() { + if rss.isEmptySendQueueRaftMuAndStreamLocked() { panic(errors.AssertionFailedf("scheduled with empty send-queue")) } if rss.mu.mode != MsgAppPull { @@ -2251,7 +2280,7 @@ func (rs *replicaState) scheduledRaftMuLocked( } if mode != rss.mu.mode { // Must be switching from MsgAppPull => MsgAppPush. - rss.tryHandleModeChangeLocked(ctx, mode, false, false) + rss.tryHandleModeChangeRaftMuAndStreamLocked(ctx, mode, false, false) return false, false } // 4MB. Don't want to hog the scheduler thread for too long. @@ -2285,54 +2314,56 @@ func (rs *replicaState) scheduledRaftMuLocked( if err != nil { // Transitioned to StateSnapshot, or some other error that Raft needs to // deal with. - rs.sendStream.closeLocked(ctx) + rs.sendStream.closeRaftMuAndStreamLocked(ctx) rs.sendStream = nil return false, true } - rss.dequeueFromQueueAndSendLocked(ctx, msg) - isEmpty := rss.isEmptySendQueueLocked() + rss.dequeueFromQueueAndSendRaftMuAndStreamLocked(ctx, msg) + isEmpty := rss.isEmptySendQueueRaftMuAndStreamLocked() if isEmpty { - rss.stopAttemptingToEmptySendQueueLocked(ctx, false) + rss.stopAttemptingToEmptySendQueueRaftMuAndStreamLocked(ctx, false) return false, true } // Still have a send-queue. watchForTokens := !rss.mu.sendQueue.forceFlushScheduled && rss.mu.sendQueue.deductedForSchedulerTokens == 0 if watchForTokens { - rss.startAttemptingToEmptySendQueueViaWatcherLocked(ctx) + rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } return !watchForTokens, false } -func (rs *replicaState) closeSendStream(ctx context.Context) { +func (rs *replicaState) closeSendStreamRaftMuLocked(ctx context.Context) { if log.ExpensiveLogEnabled(ctx, 1) { log.VEventf(ctx, 1, "closing send stream %v for replica %v", rs.stream, rs.desc) } rs.sendStream.mu.Lock() defer rs.sendStream.mu.Unlock() - rs.sendStream.closeLocked(ctx) + rs.sendStream.closeRaftMuAndStreamLocked(ctx) rs.sendStream = nil } -func (rs *replicaState) admit(ctx context.Context, av AdmittedVector) { +func (rs *replicaState) admitRaftMuLocked(ctx context.Context, av AdmittedVector) { if rss := rs.sendStream; rss != nil { - rss.admit(ctx, av) + rss.admitRaftMuLocked(ctx, av) } } -func (rss *replicaSendStream) closeLocked(ctx context.Context) { - rss.returnSendTokens(ctx, rss.mu.tracker.UntrackAll(), true /* disconnect */) - rss.returnAllEvalTokensLocked(ctx) - rss.stopAttemptingToEmptySendQueueLocked(ctx, true) +func (rss *replicaSendStream) closeRaftMuAndStreamLocked(ctx context.Context) { + rss.mu.AssertHeld() + rss.returnSendTokensRaftMuAndStreamLocked(ctx, rss.raftMu.tracker.UntrackAll(), true /* disconnect */) + rss.returnAllEvalTokensRaftMuAndStreamLocked(ctx) + rss.stopAttemptingToEmptySendQueueRaftMuAndStreamLocked(ctx, true) rss.mu.closed = true } -func (rss *replicaSendStream) handleReadyEntriesLocked( +func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( ctx context.Context, event raftEventForReplica, directive replicaDirective, ) (transitionedSendQState bool, err error) { - wasEmptySendQ := rss.isEmptySendQueueLocked() - rss.tryHandleModeChangeLocked(ctx, event.mode, wasEmptySendQ, directive.forceFlush) + rss.mu.AssertHeld() + wasEmptySendQ := rss.isEmptySendQueueRaftMuAndStreamLocked() + rss.tryHandleModeChangeRaftMuAndStreamLocked(ctx, event.mode, wasEmptySendQ, directive.forceFlush) if event.mode == MsgAppPull { // MsgAppPull mode (i.e., followers). Populate sendingEntries. n := len(event.sendingEntries) @@ -2343,7 +2374,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( if !rss.mu.sendQueue.forceFlushScheduled { // Must have a send-queue, so sendingEntries should stay empty // (these will be queued). - rss.startForceFlushLocked(ctx) + rss.startForceFlushRaftMuAndStreamLocked(ctx) } } else { // INVARIANT: !directive.forceFlush. @@ -2352,7 +2383,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( // will be queued). rss.mu.sendQueue.forceFlushScheduled = false rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1) - rss.startAttemptingToEmptySendQueueViaWatcherLocked(ctx) + rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) if directive.hasSendTokens { panic(errors.AssertionFailedf("hasSendTokens true despite send-queue")) } @@ -2400,7 +2431,7 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( rss.mu.sendQueue.originalEvalTokens[WorkClassFromRaftPriority(entry.pri)] -= tokens rss.mu.sendQueue.preciseSizeSum -= tokens } - rss.mu.tracker.Track(ctx, entry.id, pri, tokens) + rss.raftMu.tracker.Track(ctx, entry.id, pri, tokens) sendTokensToDeduct[WorkClassFromRaftPriority(pri)] += tokens } flag := AdjNormal @@ -2485,9 +2516,9 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( rss.parent.parent.opts.MsgAppSender.SendMsgApp(ctx, msg, false) } - hasEmptySendQ := rss.isEmptySendQueueLocked() + hasEmptySendQ := rss.isEmptySendQueueRaftMuAndStreamLocked() if event.mode == MsgAppPull && wasEmptySendQ && !hasEmptySendQ && !rss.mu.sendQueue.forceFlushScheduled { - rss.startAttemptingToEmptySendQueueViaWatcherLocked(ctx) + rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } // NB: we don't special case to an empty send-queue in push mode, where Raft // is responsible for causing this send-queue. Raft does not keep track of @@ -2500,9 +2531,10 @@ func (rss *replicaSendStream) handleReadyEntriesLocked( return transitionedSendQState, nil } -func (rss *replicaSendStream) tryHandleModeChangeLocked( +func (rss *replicaSendStream) tryHandleModeChangeRaftMuAndStreamLocked( ctx context.Context, mode RaftMsgAppMode, isEmptySendQ bool, toldToForceFlush bool, ) { + rss.mu.AssertHeld() if mode == rss.mu.mode { // Common case return @@ -2521,7 +2553,7 @@ func (rss *replicaSendStream) tryHandleModeChangeLocked( rss.mu.sendQueue.originalEvalTokens[admissionpb.RegularWorkClass], AdjNormal) rss.mu.eval.tokensDeducted[admissionpb.RegularWorkClass] += rss.mu.sendQueue.originalEvalTokens[admissionpb.RegularWorkClass] - rss.stopAttemptingToEmptySendQueueLocked(ctx, false) + rss.stopAttemptingToEmptySendQueueRaftMuAndStreamLocked(ctx, false) } else { // Switching from push to pull. Regular needs to be counted as elastic, so // return to regular and deduct from elastic. @@ -2534,21 +2566,22 @@ func (rss *replicaSendStream) tryHandleModeChangeLocked( rss.mu.eval.tokensDeducted[admissionpb.RegularWorkClass] -= rss.mu.sendQueue.originalEvalTokens[admissionpb.RegularWorkClass] if !isEmptySendQ && !toldToForceFlush { - rss.startAttemptingToEmptySendQueueViaWatcherLocked(ctx) + rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } } } -func (rss *replicaSendStream) startForceFlushLocked(ctx context.Context) { +func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked(ctx context.Context) { + rss.mu.AssertHeld() rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1) rss.mu.sendQueue.forceFlushScheduled = true rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) - rss.stopAttemptingToEmptySendQueueViaWatcherLocked(ctx, false) + rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false) } // Only called in MsgAppPull mode. Either when force-flushing or when // rss.mu.sendQueue.deductedFromSchedulerTokens > 0. -func (rss *replicaSendStream) dequeueFromQueueAndSendLocked( +func (rss *replicaSendStream) dequeueFromQueueAndSendRaftMuAndStreamLocked( ctx context.Context, msg raftpb.Message, ) { rss.mu.AssertHeld() @@ -2580,7 +2613,7 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendLocked( entryState.tokens } tokensNeeded += entryState.tokens - rss.mu.tracker.Track(ctx, entryState.id, raftpb.LowPri, entryState.tokens) + rss.raftMu.tracker.Track(ctx, entryState.id, raftpb.LowPri, entryState.tokens) } } if approximatedNumEntries > 0 { @@ -2619,12 +2652,16 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendLocked( rss.parent.parent.opts.MsgAppSender.SendMsgApp(ctx, msg, true) } -func (rss *replicaSendStream) isEmptySendQueueLocked() bool { +func (rss *replicaSendStream) isEmptySendQueueRaftMuAndStreamLocked() bool { + rss.mu.AssertHeld() return rss.mu.sendQueue.indexToSend == rss.mu.sendQueue.nextRaftIndex } // INVARIANT: no send-queue, and therefore not force-flushing. -func (rss *replicaSendStream) changeToProbeLocked(ctx context.Context, now time.Time) { +func (rss *replicaSendStream) changeToProbeRaftMuAndStreamLocked( + ctx context.Context, now time.Time, +) { + rss.mu.AssertHeld() if log.ExpensiveLogEnabled(ctx, 1) { log.VEventf(ctx, 1, "r%v:%v stream %v changing to probe", rss.parent.parent.opts.RangeID, rss.parent.desc, rss.parent.stream) @@ -2635,15 +2672,16 @@ func (rss *replicaSendStream) changeToProbeLocked(ctx context.Context, now time. // stream. Also schedule an event, so that even if there are no // entries, we will still reliably close the stream if still in // StateProbe. - rss.changeConnectedStateLocked(probeRecentlyNoSendQ, now) + rss.changeConnectedStateRaftMuAndStreamLocked(probeRecentlyNoSendQ, now) rss.parent.parent.opts.CloseTimerScheduler.ScheduleSendStreamCloseRaftMuLocked( ctx, rss.parent.parent.opts.RangeID, probeRecentlyNoSendQDuration()) // Return all tokens since other ranges may need them, and it may be some // time before this replica transitions back to StateReplicate. - rss.returnSendTokens(ctx, rss.mu.tracker.UntrackAll(), true /* disconnect */) - rss.returnAllEvalTokensLocked(ctx) + rss.returnSendTokensRaftMuAndStreamLocked( + ctx, rss.raftMu.tracker.UntrackAll(), true /* disconnect */) + rss.returnAllEvalTokensRaftMuAndStreamLocked(ctx) rss.mu.sendQueue.originalEvalTokens = [admissionpb.NumWorkClasses]kvflowcontrol.Tokens{} - if !rss.isEmptySendQueueLocked() { + if !rss.isEmptySendQueueRaftMuAndStreamLocked() { panic(errors.AssertionFailedf("transitioning to probeRecentlyNoSendQ when have a send-queue")) } if rss.mu.sendQueue.forceFlushScheduled { @@ -2655,19 +2693,21 @@ func (rss *replicaSendStream) changeToProbeLocked(ctx context.Context, now time. } } -func (rss *replicaSendStream) stopAttemptingToEmptySendQueueLocked( +func (rss *replicaSendStream) stopAttemptingToEmptySendQueueRaftMuAndStreamLocked( ctx context.Context, disconnect bool, ) { + rss.mu.AssertHeld() if rss.mu.sendQueue.forceFlushScheduled { rss.mu.sendQueue.forceFlushScheduled = false rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1) } - rss.stopAttemptingToEmptySendQueueViaWatcherLocked(ctx, disconnect) + rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, disconnect) } -func (rss *replicaSendStream) stopAttemptingToEmptySendQueueViaWatcherLocked( +func (rss *replicaSendStream) stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked( ctx context.Context, disconnect bool, ) { + rss.mu.AssertHeld() if rss.mu.sendQueue.deductedForSchedulerTokens != 0 { // Update metrics. flag := AdjNormal @@ -2688,7 +2728,12 @@ func (rss *replicaSendStream) stopAttemptingToEmptySendQueueViaWatcherLocked( } } -func (rss *replicaSendStream) startAttemptingToEmptySendQueueViaWatcherLocked(ctx context.Context) { +// NB: raftMu may or may not be held. Specifically, when called from Notify, +// raftMu is not held. +func (rss *replicaSendStream) startAttemptingToEmptySendQueueViaWatcherStreamLocked( + ctx context.Context, +) { + rss.mu.AssertHeld() if rss.mu.sendQueue.forceFlushScheduled { panic(errors.AssertionFailedf("already trying to empty send-queue using force-flush")) } @@ -2704,6 +2749,9 @@ func (rss *replicaSendStream) startAttemptingToEmptySendQueueViaWatcherLocked(ct func (rss *replicaSendStream) Notify(ctx context.Context) { rss.mu.Lock() defer rss.mu.Unlock() + if rss.mu.closed { + return + } if rss.mu.sendQueue.tokenWatcherHandle == (SendTokenWatcherHandle{}) { return } @@ -2711,7 +2759,7 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { if rss.mu.sendQueue.deductedForSchedulerTokens != 0 { panic(errors.AssertionFailedf("watcher was registered when already had tokens")) } - queueSize := rss.approxQueueSizeLocked() + queueSize := rss.approxQueueSizeStreamLocked() if queueSize == 0 { panic(errors.AssertionFailedf("watcher was registered with empty send-queue")) } @@ -2730,7 +2778,7 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { tokens := rss.parent.sendTokenCounter.TryDeduct(ctx, admissionpb.ElasticWorkClass, queueSize, flag) if tokens == 0 { // Rare case: no tokens available despite notification. Register again. - rss.startAttemptingToEmptySendQueueViaWatcherLocked(ctx) + rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) return } rss.mu.sendQueue.deductedForSchedulerTokens = tokens @@ -2738,7 +2786,10 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) } -func (rss *replicaSendStream) approxQueueSizeLocked() kvflowcontrol.Tokens { +// NB: raftMu may or may not be held. Specifically, when called from Notify, +// raftMu is not held. +func (rss *replicaSendStream) approxQueueSizeStreamLocked() kvflowcontrol.Tokens { + rss.mu.AssertHeld() var size kvflowcontrol.Tokens countWithApproxStats := int64(rss.mu.nextRaftIndexInitial) - int64(rss.mu.sendQueue.indexToSend) if countWithApproxStats > 0 { @@ -2749,16 +2800,18 @@ func (rss *replicaSendStream) approxQueueSizeLocked() kvflowcontrol.Tokens { return size } -func (rss *replicaSendStream) queueLengthLocked() int64 { +func (rss *replicaSendStream) queueLengthRaftMuAndStreamLocked() int64 { + rss.mu.AssertHeld() // NB: INVARIANT nextRaftIndex >= indexToSend, no underflow possible. return int64(rss.mu.sendQueue.nextRaftIndex - rss.mu.sendQueue.indexToSend) } -// returnSendTokens takes the tokens untracked by the tracker and returns them -// to the send token counters. -func (rss *replicaSendStream) returnSendTokens( +// returnSendTokensRaftMuAndStreamLocked takes the tokens untracked by the +// tracker and returns them to the send token counters. +func (rss *replicaSendStream) returnSendTokensRaftMuAndStreamLocked( ctx context.Context, returned [raftpb.NumPriorities]kvflowcontrol.Tokens, disconnect bool, ) { + rss.mu.AssertHeld() flag := AdjNormal if disconnect { flag = AdjDisconnect @@ -2771,10 +2824,12 @@ func (rss *replicaSendStream) returnSendTokens( } } -// returnEvalTokensLocked returns tokens to the eval token counters. -func (rss *replicaSendStream) returnEvalTokensLocked( +// returnEvalTokensRaftMuAndStreamLocked returns tokens to the eval token +// counters. +func (rss *replicaSendStream) returnEvalTokensRaftMuAndStreamLocked( ctx context.Context, returnedEval [raftpb.NumPriorities]kvflowcontrol.Tokens, ) { + rss.mu.AssertHeld() for pri, tokens := range returnedEval { rpri := raftpb.Priority(pri) wc := WorkClassFromRaftPriority(rpri) @@ -2794,7 +2849,8 @@ func (rss *replicaSendStream) returnEvalTokensLocked( } } -func (rss *replicaSendStream) returnAllEvalTokensLocked(ctx context.Context) { +func (rss *replicaSendStream) returnAllEvalTokensRaftMuAndStreamLocked(ctx context.Context) { + rss.mu.AssertHeld() for wc, tokens := range rss.mu.eval.tokensDeducted { if tokens > 0 { // NB: This is only called for disconnects. @@ -2808,7 +2864,7 @@ func (rss *replicaSendStream) checkConsistencyRaftMuLocked() { rss.mu.Lock() defer rss.mu.Unlock() if rss.mu.connectedState == probeRecentlyNoSendQ { - if !rss.mu.tracker.Empty() { + if !rss.raftMu.tracker.Empty() { panic(errors.AssertionFailedf("tracker is not empty in state probe")) } for _, tokens := range rss.mu.eval.tokensDeducted { @@ -2829,7 +2885,7 @@ func (rss *replicaSendStream) checkConsistencyRaftMuLocked() { // deducted. NB: indices < rss.mu.nextRaftIndexInitial were in the // send-queue when the replicaSendStream was created, so did not have eval // tokens deducted. - trackerTokens := rss.mu.tracker.tokensGE(rss.mu.nextRaftIndexInitial) + trackerTokens := rss.raftMu.tracker.tokensGE(rss.mu.nextRaftIndexInitial) for pri, t := range trackerTokens { tokens[WorkClassFromRaftPriority(raftpb.Priority(pri))] += t } @@ -2850,7 +2906,7 @@ func (rss *replicaSendStream) checkConsistencyRaftMuLocked() { admissionpb.WorkClass(wc), t, tokens[wc])) } } - if rss.isEmptySendQueueLocked() && rss.mu.sendQueue.deductedForSchedulerTokens != 0 { + if rss.isEmptySendQueueRaftMuAndStreamLocked() && rss.mu.sendQueue.deductedForSchedulerTokens != 0 { panic(errors.AssertionFailedf("empty send-queue and non-zero deductedForSchedulerTokens")) } } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index dc5f9851034e..ea67a67ca63b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -246,7 +246,7 @@ func (s *testingRCState) sendStreamString(rangeID roachpb.RangeID) string { fmt.Fprintf(&b, "eval original in send-q: reg=%v ela=%v\n", rss.mu.sendQueue.originalEvalTokens[admissionpb.RegularWorkClass], rss.mu.sendQueue.originalEvalTokens[admissionpb.ElasticWorkClass]) - b.WriteString(formatTrackerState(&rss.mu.tracker)) + b.WriteString(formatTrackerState(&rss.raftMu.tracker)) b.WriteString("++++\n") } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go index ca475185de0f..cd3cc6cc92a7 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go @@ -929,7 +929,7 @@ func (rot *rcOpTicker) tick(ctx context.Context, t time.Time) { rid := rot.handle.testingFindReplStreamOrFatal(ctx, rot.stream) rs := rot.handle.rc.replicaMap[rid] if rs.sendStream != nil { - data = rs.sendStream.mu.tracker.testingString() + data = rs.sendStream.raftMu.tracker.testingString() } rot.handle.snapshots = append(rot.handle.snapshots, testingTrackerSnapshot{ time: t, @@ -1136,9 +1136,9 @@ func (r *testingRCRange) testingReturnTokens( raftPri := AdmissionToRaftPriority(pri) returnIndex := uint64(0) r.mu.outstandingReturns[rid] += tokens - n := rs.sendStream.mu.tracker.tracked[raftPri].Length() + n := rs.sendStream.raftMu.tracker.tracked[raftPri].Length() for i := 0; i < n; i++ { - deduction := rs.sendStream.mu.tracker.tracked[raftPri].At(i) + deduction := rs.sendStream.raftMu.tracker.tracked[raftPri].At(i) if r.mu.outstandingReturns[rid]-deduction.tokens >= 0 { r.mu.outstandingReturns[rid] -= deduction.tokens returnIndex = deduction.id.index diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 76f16d22d168..3c292b81fedb 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -29,6 +29,9 @@ import ( // Replica abstracts kvserver.Replica. It exposes internal implementation // details of Replica, specifically the locking behavior, since it is // essential to reason about correctness. +// +// TODO(sumeer): because the mutex assertions are hidden behind an interface, +// they are not free for production builds. Fix, and then add more assertions. type Replica interface { // RaftMuAssertHeld asserts that Replica.raftMu is held. RaftMuAssertHeld() @@ -204,9 +207,9 @@ type SideChannelInfoUsingRaftMessageRequest struct { // We *strongly* prefer methods to be called without holding Replica.mu, since // then the callee (implementation of Processor) does not need to worry about // (a) deadlocks, since it sometimes needs to lock Replica.mu itself, (b) the -// amount of work it is doing under this critical section. There are two +// amount of work it is doing under this critical section. There are three // exceptions to this, due to difficulty in changing the calling code: -// InitRaftLocked, OnDescChangedLocked. +// InitRaftLocked, OnDescChangedLocked, HoldsSendTokensLocked. type Processor interface { // InitRaftLocked is called when raft.RawNode is initialized for the // Replica. NB: can be called twice before the Replica is fully initialized. @@ -329,11 +332,11 @@ type Processor interface { // // raftMu is held. MaybeSendPingsRaftMuLocked() - // HoldsSendTokensRaftMuLocked returns true if the replica is the leader using + // HoldsSendTokensLocked returns true if the replica is the leader using // RACv2, and holds any send tokens. Used to prevent replica quiescence. // - // raftMu is held. - HoldsSendTokensRaftMuLocked() bool + // Both Replica.raftMu and Replica.mu are held. + HoldsSendTokensLocked() bool // AdmitForEval is called to admit work that wants to evaluate at the // leaseholder. @@ -1091,11 +1094,12 @@ func (p *processorImpl) MaybeSendPingsRaftMuLocked() { } } -// HoldsSendTokensRaftMuLocked implements Processor. -func (p *processorImpl) HoldsSendTokensRaftMuLocked() bool { +// HoldsSendTokensLocked implements Processor. +func (p *processorImpl) HoldsSendTokensLocked() bool { p.opts.Replica.RaftMuAssertHeld() + p.opts.Replica.MuAssertHeld() if rc := p.leader.rc; rc != nil { - return rc.HoldsSendTokensRaftMuLocked() + return rc.HoldsSendTokensLocked() } return false } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 2d45c90e95e4..ec71fb81b5ab 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -218,8 +218,8 @@ func (c *testRangeController) MaybeSendPingsRaftMuLocked() { fmt.Fprintf(c.b, " RangeController.MaybeSendPingsRaftMuLocked()\n") } -func (c *testRangeController) HoldsSendTokensRaftMuLocked() bool { - fmt.Fprintf(c.b, " RangeController.HoldsSendTokensRaftMuLocked()\n") +func (c *testRangeController) HoldsSendTokensLocked() bool { + fmt.Fprintf(c.b, " RangeController.HoldsSendTokensLocked()\n") return false } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 8fd09999376a..7664c454797a 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -598,13 +598,13 @@ func (r *Replica) hasPendingProposalQuotaRLocked() bool { return !r.mu.proposalQuota.Full() } -// hasSendTokensRaftMuLocked is part of the quiescer interface. It returns true -// if RACv2 holds any send tokens for this range. +// hasSendTokensRaftMuLockedReplicaMuLocked is part of the quiescer interface. +// It returns true if RACv2 holds any send tokens for this range. // // We can't quiesce while any send tokens are held because this could lead to // never releasing them. Tokens must be released. -func (r *Replica) hasSendTokensRaftMuLocked() bool { - return r.flowControlV2.HoldsSendTokensRaftMuLocked() +func (r *Replica) hasSendTokensRaftMuLockedReplicaMuLocked() bool { + return r.flowControlV2.HoldsSendTokensLocked() } // ticksSinceLastProposalRLocked returns the number of ticks since the last diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index fc7364da2393..7f825362ea14 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -212,9 +212,7 @@ type quiescer interface { hasRaftReadyRLocked() bool hasPendingProposalsRLocked() bool hasPendingProposalQuotaRLocked() bool - // TODO(pav-kv): this method is a one-off holding raftMu. It should be able to - // do its job with only Replica.mu held. - hasSendTokensRaftMuLocked() bool + hasSendTokensRaftMuLockedReplicaMuLocked() bool ticksSinceLastProposalRLocked() int mergeInProgressRLocked() bool isDestroyedRLocked() (DestroyReason, error) @@ -336,7 +334,7 @@ func shouldReplicaQuiesceRaftMuLockedReplicaMuLocked( // Likewise, do not quiesce if any RACv2 send tokens are held. Quiescing would // terminate MsgApp pings which make sure the admitted state converges, and // send tokens are eventually released. - if q.hasSendTokensRaftMuLocked() { + if q.hasSendTokensRaftMuLockedReplicaMuLocked() { if log.V(4) { log.Infof(ctx, "not quiescing: holds RACv2 send tokens") } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 99d2d78f4115..b233127bedd1 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -9913,7 +9913,7 @@ func (q *testQuiescer) hasPendingProposalQuotaRLocked() bool { return q.pendingQuota } -func (q *testQuiescer) hasSendTokensRaftMuLocked() bool { +func (q *testQuiescer) hasSendTokensRaftMuLockedReplicaMuLocked() bool { return q.sendTokens }