From f9568df8019bd975fdba84c23265828aa877d0fb Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 15 Oct 2024 12:47:58 -0400 Subject: [PATCH] kvserver: don't call Processor.AdmitRaftMuLocked while holding Replica.mu This method will eventually acquire replicaSendStream.mu, which needs to be ordered before Replica.mu. Fixes #132637 #132638 #132639 #132640 #132641 #132642 #132643 #132644 #132645 #132646 #132647 #132648 #132649 Epic: CRDB-37515 Release note: None --- pkg/kv/kvserver/replica_raft.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 1c303c93d6d0..87c5ad9dc4d3 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -633,6 +633,7 @@ var errRemoved = errors.New("replica removed") func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) error { r.raftMu.AssertHeld() var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest + var admittedVector rac2.AdmittedVector err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft @@ -698,9 +699,8 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) // If there is an admitted vector annotation, pass it to RACv2 to release // the flow control tokens. if term := req.AdmittedState.Term; term != 0 { - av := rac2.AdmittedVector{Term: term} - copy(av.Admitted[:], req.AdmittedState.Admitted) - r.flowControlV2.AdmitRaftMuLocked(context.TODO(), req.FromReplica.ReplicaID, av) + admittedVector = rac2.AdmittedVector{Term: term} + copy(admittedVector.Admitted[:], req.AdmittedState.Admitted) } } err := raftGroup.Step(req.Message) @@ -717,6 +717,9 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) if sideChannelInfo != (replica_rac2.SideChannelInfoUsingRaftMessageRequest{}) { r.flowControlV2.SideChannelForPriorityOverrideAtFollowerRaftMuLocked(sideChannelInfo) } + if admittedVector != (rac2.AdmittedVector{}) { + r.flowControlV2.AdmitRaftMuLocked(context.TODO(), req.FromReplica.ReplicaID, admittedVector) + } return err }