Skip to content

Commit

Permalink
rac2,replica_rac2: fix deadlock due to HoldsSendTokensRaftMuLocked be…
Browse files Browse the repository at this point in the history
…ing called with Replica.mu held

It isn't convenient to not hold Replica.mu in the caller so instead we
avoid needing replicaSendStream.mu (which is and must be ordered before
Replica.mu). This is done by lifting replicaSendStream.mu.Tracker out
of the mu struct.

Additional changes:
- All methods in replicaState and replicaSendStream are named to include
  what locks are held. This makes them verbose, but it is important for
  correctness.
- Assertions are added for replicaSendStream.mu being held.
- Todos are added to make Replica.raftMu and Replica.mu assertions free
  in replica_rac2 and rac2 code, and once that is done to add more
  assertions in rac2.
- Todo is added to lift some more fields in replicaSendStream from
  inside mu (the main reason we need mu is for replicaSendStream.Notify).
  This todo is ordered after the previous one (more assertions).

Fixes cockroachdb#132646, cockroachdb#132642

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Oct 16, 2024
1 parent 35db162 commit 7c131f1
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 150 deletions.
310 changes: 183 additions & 127 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (s *testingRCState) sendStreamString(rangeID roachpb.RangeID) string {
fmt.Fprintf(&b, "eval original in send-q: reg=%v ela=%v\n",
rss.mu.sendQueue.originalEvalTokens[admissionpb.RegularWorkClass],
rss.mu.sendQueue.originalEvalTokens[admissionpb.ElasticWorkClass])
b.WriteString(formatTrackerState(&rss.mu.tracker))
b.WriteString(formatTrackerState(&rss.raftMu.tracker))
b.WriteString("++++\n")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func (rot *rcOpTicker) tick(ctx context.Context, t time.Time) {
rid := rot.handle.testingFindReplStreamOrFatal(ctx, rot.stream)
rs := rot.handle.rc.replicaMap[rid]
if rs.sendStream != nil {
data = rs.sendStream.mu.tracker.testingString()
data = rs.sendStream.raftMu.tracker.testingString()
}
rot.handle.snapshots = append(rot.handle.snapshots, testingTrackerSnapshot{
time: t,
Expand Down Expand Up @@ -1136,9 +1136,9 @@ func (r *testingRCRange) testingReturnTokens(
raftPri := AdmissionToRaftPriority(pri)
returnIndex := uint64(0)
r.mu.outstandingReturns[rid] += tokens
n := rs.sendStream.mu.tracker.tracked[raftPri].Length()
n := rs.sendStream.raftMu.tracker.tracked[raftPri].Length()
for i := 0; i < n; i++ {
deduction := rs.sendStream.mu.tracker.tracked[raftPri].At(i)
deduction := rs.sendStream.raftMu.tracker.tracked[raftPri].At(i)
if r.mu.outstandingReturns[rid]-deduction.tokens >= 0 {
r.mu.outstandingReturns[rid] -= deduction.tokens
returnIndex = deduction.id.index
Expand Down
20 changes: 12 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
// Replica abstracts kvserver.Replica. It exposes internal implementation
// details of Replica, specifically the locking behavior, since it is
// essential to reason about correctness.
//
// TODO(sumeer): because the mutex assertions are hidden behind an interface,
// they are not free for production builds. Fix, and then add more assertions.
type Replica interface {
// RaftMuAssertHeld asserts that Replica.raftMu is held.
RaftMuAssertHeld()
Expand Down Expand Up @@ -204,9 +207,9 @@ type SideChannelInfoUsingRaftMessageRequest struct {
// We *strongly* prefer methods to be called without holding Replica.mu, since
// then the callee (implementation of Processor) does not need to worry about
// (a) deadlocks, since it sometimes needs to lock Replica.mu itself, (b) the
// amount of work it is doing under this critical section. There are two
// amount of work it is doing under this critical section. There are three
// exceptions to this, due to difficulty in changing the calling code:
// InitRaftLocked, OnDescChangedLocked.
// InitRaftLocked, OnDescChangedLocked, HoldsSendTokensLocked.
type Processor interface {
// InitRaftLocked is called when raft.RawNode is initialized for the
// Replica. NB: can be called twice before the Replica is fully initialized.
Expand Down Expand Up @@ -329,11 +332,11 @@ type Processor interface {
//
// raftMu is held.
MaybeSendPingsRaftMuLocked()
// HoldsSendTokensRaftMuLocked returns true if the replica is the leader using
// HoldsSendTokensLocked returns true if the replica is the leader using
// RACv2, and holds any send tokens. Used to prevent replica quiescence.
//
// raftMu is held.
HoldsSendTokensRaftMuLocked() bool
// Both Replica.raftMu and Replica.mu are held.
HoldsSendTokensLocked() bool

// AdmitForEval is called to admit work that wants to evaluate at the
// leaseholder.
Expand Down Expand Up @@ -1091,11 +1094,12 @@ func (p *processorImpl) MaybeSendPingsRaftMuLocked() {
}
}

// HoldsSendTokensRaftMuLocked implements Processor.
func (p *processorImpl) HoldsSendTokensRaftMuLocked() bool {
// HoldsSendTokensLocked implements Processor.
func (p *processorImpl) HoldsSendTokensLocked() bool {
p.opts.Replica.RaftMuAssertHeld()
p.opts.Replica.MuAssertHeld()
if rc := p.leader.rc; rc != nil {
return rc.HoldsSendTokensRaftMuLocked()
return rc.HoldsSendTokensLocked()
}
return false
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (c *testRangeController) MaybeSendPingsRaftMuLocked() {
fmt.Fprintf(c.b, " RangeController.MaybeSendPingsRaftMuLocked()\n")
}

func (c *testRangeController) HoldsSendTokensRaftMuLocked() bool {
fmt.Fprintf(c.b, " RangeController.HoldsSendTokensRaftMuLocked()\n")
func (c *testRangeController) HoldsSendTokensLocked() bool {
fmt.Fprintf(c.b, " RangeController.HoldsSendTokensLocked()\n")
return false
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,13 @@ func (r *Replica) hasPendingProposalQuotaRLocked() bool {
return !r.mu.proposalQuota.Full()
}

// hasSendTokensRaftMuLocked is part of the quiescer interface. It returns true
// if RACv2 holds any send tokens for this range.
// hasSendTokensRaftMuLockedReplicaMuLocked is part of the quiescer interface.
// It returns true if RACv2 holds any send tokens for this range.
//
// We can't quiesce while any send tokens are held because this could lead to
// never releasing them. Tokens must be released.
func (r *Replica) hasSendTokensRaftMuLocked() bool {
return r.flowControlV2.HoldsSendTokensRaftMuLocked()
func (r *Replica) hasSendTokensRaftMuLockedReplicaMuLocked() bool {
return r.flowControlV2.HoldsSendTokensLocked()
}

// ticksSinceLastProposalRLocked returns the number of ticks since the last
Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,7 @@ type quiescer interface {
hasRaftReadyRLocked() bool
hasPendingProposalsRLocked() bool
hasPendingProposalQuotaRLocked() bool
// TODO(pav-kv): this method is a one-off holding raftMu. It should be able to
// do its job with only Replica.mu held.
hasSendTokensRaftMuLocked() bool
hasSendTokensRaftMuLockedReplicaMuLocked() bool
ticksSinceLastProposalRLocked() int
mergeInProgressRLocked() bool
isDestroyedRLocked() (DestroyReason, error)
Expand Down Expand Up @@ -336,7 +334,7 @@ func shouldReplicaQuiesceRaftMuLockedReplicaMuLocked(
// Likewise, do not quiesce if any RACv2 send tokens are held. Quiescing would
// terminate MsgApp pings which make sure the admitted state converges, and
// send tokens are eventually released.
if q.hasSendTokensRaftMuLocked() {
if q.hasSendTokensRaftMuLockedReplicaMuLocked() {
if log.V(4) {
log.Infof(ctx, "not quiescing: holds RACv2 send tokens")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9913,7 +9913,7 @@ func (q *testQuiescer) hasPendingProposalQuotaRLocked() bool {
return q.pendingQuota
}

func (q *testQuiescer) hasSendTokensRaftMuLocked() bool {
func (q *testQuiescer) hasSendTokensRaftMuLockedReplicaMuLocked() bool {
return q.sendTokens
}

Expand Down

0 comments on commit 7c131f1

Please sign in to comment.