diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 1c303c93d6d0..87c5ad9dc4d3 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -633,6 +633,7 @@ var errRemoved = errors.New("replica removed") func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) error { r.raftMu.AssertHeld() var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest + var admittedVector rac2.AdmittedVector err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft @@ -698,9 +699,8 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) // If there is an admitted vector annotation, pass it to RACv2 to release // the flow control tokens. if term := req.AdmittedState.Term; term != 0 { - av := rac2.AdmittedVector{Term: term} - copy(av.Admitted[:], req.AdmittedState.Admitted) - r.flowControlV2.AdmitRaftMuLocked(context.TODO(), req.FromReplica.ReplicaID, av) + admittedVector = rac2.AdmittedVector{Term: term} + copy(admittedVector.Admitted[:], req.AdmittedState.Admitted) } } err := raftGroup.Step(req.Message) @@ -717,6 +717,9 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) if sideChannelInfo != (replica_rac2.SideChannelInfoUsingRaftMessageRequest{}) { r.flowControlV2.SideChannelForPriorityOverrideAtFollowerRaftMuLocked(sideChannelInfo) } + if admittedVector != (rac2.AdmittedVector{}) { + r.flowControlV2.AdmitRaftMuLocked(context.TODO(), req.FromReplica.ReplicaID, admittedVector) + } return err }