Skip to content

Commit

Permalink
Merge #124006
Browse files Browse the repository at this point in the history
124006: raft: consolidate all append message sending r=nvanbenschoten a=pav-kv

This PR consolidates all decision-making about sending append messages into a single `maybeSendAppend` method. Previously, the behaviour depended on the `sendIfEmpty` flag which was set/unset depending on the context in which the method is called. This is unnecessary because the `Progress` struct contains enough information about the leader->follower flow state, so `maybeSendAppend` can be made stand-alone.

In follow-up PRs, the consolidated `maybeSendAppend` method will be used to implement a more flexible message flow control.

Ported from etcd-io/raft#134

Epic: CRDB-37515
Release note: none

Co-authored-by: Pavel Kalinnikov <[email protected]>
craig[bot] and pav-kv committed May 29, 2024
2 parents 604bb00 + 400b4b1 commit 415d06c
Showing 12 changed files with 196 additions and 163 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/flow_control_replica.go
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ func (rf *replicaFlowControl) getBehindFollowers() map[roachpb.ReplicaID]struct{
// time for it to catch up and then later return those tokens to us.
// This is I3a again; do it as part of #95563.
_ = progress.RecentActive
_ = progress.MsgAppFlowPaused
_ = progress.MsgAppProbesPaused
_ = progress.Match
})
return behindFollowers
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/flow_control_replica_integration_test.go
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ import (
// follows: progress=(replid@match:<state>:<active>:<paused>,...).
// <state> is one of {probe,replicate,snapshot}, <active> is
// {active,!inactive}, and <paused> is {paused,!paused}. The latter controls
// MsgAppFlowPaused in the raft library, not the CRDB-level follower
// MsgAppProbesPaused in the raft library, not the CRDB-level follower
// pausing.
//
// B. For the raft transport, we can specify the set of replica IDs we're
@@ -169,12 +169,12 @@ func TestFlowControlReplicaIntegration(t *testing.T) {
paused := parts[3] == "paused"

progress[replID] = tracker.Progress{
Match: uint64(index),
State: state,
RecentActive: active,
MsgAppFlowPaused: paused,
Inflights: tracker.NewInflights(1, 0), // avoid NPE
IsLearner: false,
Match: uint64(index),
State: state,
RecentActive: active,
MsgAppProbesPaused: paused,
Inflights: tracker.NewInflights(1, 0), // avoid NPE
IsLearner: false,
}

case "descriptor", "paused", "inactive":
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/split_delay_helper_test.go
Original file line number Diff line number Diff line change
@@ -139,10 +139,10 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) {
st := statusWithState(raft.StateLeader)
st.Progress = map[uint64]tracker.Progress{
2: {
State: state,
RecentActive: true,
MsgAppFlowPaused: true, // Unifies string output below.
Inflights: &tracker.Inflights{},
State: state,
RecentActive: true,
MsgAppProbesPaused: true, // Unifies string output below.
Inflights: &tracker.Inflights{},
},
// Healthy follower just for kicks.
3: {State: tracker.StateReplicate},
8 changes: 4 additions & 4 deletions pkg/raft/doc.go
Original file line number Diff line number Diff line change
@@ -314,7 +314,7 @@ stale log entries:
rafthttp package.
'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
back to follower, because it indicates that there is a valid leader sending
'MsgApp' messages. Candidate and follower respond to this message in
@@ -352,8 +352,8 @@ stale log entries:
'MsgSnap' requests to install a snapshot message. When a node has just
become a leader or the leader receives 'MsgProp' message, it calls
'bcastAppend' method, which then calls 'sendAppend' method to each
follower. In 'sendAppend', if a leader fails to get term or entries,
'bcastAppend' method, which then calls 'maybeSendAppend' method to each
follower. In 'maybeSendAppend', if a leader fails to get term or entries,
the leader requests snapshot by sending 'MsgSnap' type message.
'MsgSnapStatus' tells the result of snapshot install message. When a
@@ -375,7 +375,7 @@ stale log entries:
'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
is passed to leader's Step method, the leader knows which follower
responded. And only when the leader's last committed index is greater than
follower's Match index, the leader runs 'sendAppend` method.
follower's Match index, the leader runs 'maybeSendAppend` method.
'MsgUnreachable' tells that request(message) wasn't delivered. When
'MsgUnreachable' is passed to leader's Step method, the leader discovers
134 changes: 47 additions & 87 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
@@ -579,24 +579,24 @@ func (r *raft) send(m pb.Message) {
}
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
// maybeSendAppend sends an append RPC with log entries (if any) that are not
// yet known to be replicated in the given peer's log, as well as the current
// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the
// log has been compacted) it can send a MsgSnap.
//
// In some cases, the MsgApp message can have zero entries, and yet be sent.
// When the follower log is not fully up-to-date, we must send a MsgApp
// periodically so that eventually the flow is either accepted or rejected. Not
// doing so can result in replication stall, in cases when a MsgApp is dropped.
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
// Returns true if a message was sent, or false otherwise. A message is not sent
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to uint64) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {

last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
return false
}

@@ -608,36 +608,26 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return r.maybeSendSnapshot(to, pr)
}

var ents []pb.Entry
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
}
if len(ents) == 0 && !sendIfEmpty {
return false
}
// TODO(pav-kv): move this check up to where err is returned.
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
var entries []pb.Entry
if pr.CanSendEntries(last) {
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
return r.maybeSendSnapshot(to, pr)
}
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
// Send the MsgApp, and update the progress accordingly.
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: prevIndex,
LogTerm: prevTerm,
Entries: ents,
Commit: r.raftLog.committed,
Entries: entries,
Commit: commit,
Match: pr.Match,
})
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
pr.SentEntries(len(entries), uint64(payloadsSize(entries)))
pr.SentCommit(commit)
return true
}

@@ -696,7 +686,7 @@ func (r *raft) bcastAppend() {
if id == r.id {
return
}
r.sendAppend(id)
r.maybeSendAppend(id)
})
}

@@ -1450,7 +1440,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
r.maybeSendAppend(m.From)
}
} else {
// We want to update our tracking if the response updates our
@@ -1486,21 +1476,13 @@ func stepLeader(r *raft, m pb.Message) error {

if r.maybeCommit() {
r.bcastAppend()
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
// We've updated flow control information above, which may allow us to
// send multiple (size-limited) in-flight messages at once (such as when
// transitioning from probe to replicate, or when freeTo() covers
// multiple messages). Send as many messages as we can.
if r.id != m.From {
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
for r.maybeSendAppend(m.From) {
}
}
// Transfer leadership is in progress.
@@ -1512,24 +1494,8 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false

// NB: if the follower is paused (full Inflights), this will still send an
// empty append, allowing it to recover from situations in which all the
// messages that filled up Inflights in the first place were dropped. Note
// also that the outgoing heartbeat already communicated the commit index.
//
// If the follower is fully caught up but also in StateProbe (as can happen
// if ReportUnreachable was called), we also want to send an append (it will
// be empty) to allow the follower to transition back to StateReplicate once
// it responds.
//
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}
pr.MsgAppProbesPaused = false
r.maybeSendAppend(m.From)

case pb.MsgSnapStatus:
if pr.State != tracker.StateSnapshot {
@@ -1548,7 +1514,7 @@ func stepLeader(r *raft, m pb.Message) error {
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.MsgAppFlowPaused = true
pr.MsgAppProbesPaused = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
@@ -1585,7 +1551,8 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
pr.MsgAppProbesPaused = false
r.maybeSendAppend(leadTransferee)
}
}
return nil
@@ -1957,21 +1924,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
return cs
}

if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
r.maybeCommit()
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
//
// Otherwise, still probe the newly added replicas; there's no reason to let
// them wait out a heartbeat interval (or the next incoming proposal).
r.bcastAppend()

// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
8 changes: 4 additions & 4 deletions pkg/raft/raft_snap_test.go
Original file line number Diff line number Diff line change
@@ -86,8 +86,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.trk.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

@@ -109,8 +109,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.trk.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

Loading

0 comments on commit 415d06c

Please sign in to comment.