Skip to content

Commit

Permalink
Merge pull request #171 from pav-kv/track-inflight-commit
Browse files Browse the repository at this point in the history
tracker: track in-flight commit index
  • Loading branch information
ahrtr authored Mar 7, 2024
2 parents ed26e90 + 0f9fe52 commit bc285dd
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 159 deletions.
26 changes: 16 additions & 10 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) {
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
Expand Down Expand Up @@ -640,7 +644,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
Entries: ents,
Commit: r.raftLog.committed,
})
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
return true
}

Expand Down Expand Up @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
pr := r.trk.Progress[to]
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
commit := min(pr.Match, r.raftLog.committed)
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}

r.send(m)
})
pr.SentCommit(commit)
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
Expand Down Expand Up @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
Expand Down Expand Up @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
Expand Down
14 changes: 1 addition & 13 deletions testdata/confchange_v1_add_single.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ stabilize
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -94,15 +94,3 @@ stabilize
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
16 changes: 2 additions & 14 deletions testdata/confchange_v2_add_double_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ stabilize 1 2
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down Expand Up @@ -171,7 +171,7 @@ stabilize 1 3
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 3 for index 3
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=5]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -193,18 +193,6 @@ stabilize 1 3
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/5
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=5 next=6 paused pendingSnap=5]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/5 Commit:5
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/5 Commit:5
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:1 Log:0/5
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/5

# Nothing else happens.
stabilize
Expand Down
2 changes: 1 addition & 1 deletion testdata/confchange_v2_add_double_implicit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ stabilize 1 2
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down
14 changes: 1 addition & 13 deletions testdata/confchange_v2_add_single_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ stabilize
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -95,15 +95,3 @@ stabilize
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
14 changes: 1 addition & 13 deletions testdata/confchange_v2_add_single_explicit.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ stabilize 1 2
DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3
DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1]
DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4]
DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -95,18 +95,6 @@ stabilize 1 2
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/4

# Check that we're not allowed to change membership again while in the joint state.
# This leads to an empty entry being proposed instead (index 5 in the stabilize block
Expand Down
16 changes: 0 additions & 16 deletions testdata/confchange_v2_replace_leader.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ stabilize
4->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
4->1 MsgAppResp Term:1 Log:0/4
> 1 handling Ready
Ready MustSync=false:
Messages:
1->4 MsgApp Term:1 Log:1/4 Commit:4
> 4 receiving messages
1->4 MsgApp Term:1 Log:1/4 Commit:4
> 4 handling Ready
Ready MustSync=false:
Messages:
4->1 MsgAppResp Term:1 Log:0/4
> 1 receiving messages
4->1 MsgAppResp Term:1 Log:0/4


# Transfer leadership while in the joint config.
Expand Down Expand Up @@ -284,12 +272,10 @@ stabilize
CommittedEntries:
2/5 EntryNormal ""
Messages:
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
4->2 MsgApp Term:2 Log:2/5 Commit:5
4->3 MsgApp Term:2 Log:2/5 Commit:5
> 1 receiving messages
4->1 MsgApp Term:2 Log:2/5 Commit:4
4->1 MsgApp Term:2 Log:2/5 Commit:5
> 2 receiving messages
4->2 MsgApp Term:2 Log:2/5 Commit:5
Expand All @@ -302,7 +288,6 @@ stabilize
2/5 EntryNormal ""
Messages:
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
> 2 handling Ready
Ready MustSync=false:
HardState Term:2 Vote:4 Commit:5
Expand All @@ -318,7 +303,6 @@ stabilize
Messages:
3->4 MsgAppResp Term:2 Log:0/5
> 4 receiving messages
1->4 MsgAppResp Term:2 Log:0/5
1->4 MsgAppResp Term:2 Log:0/5
2->4 MsgAppResp Term:2 Log:0/5
3->4 MsgAppResp Term:2 Log:0/5
Expand Down
60 changes: 0 additions & 60 deletions testdata/probe_and_replicate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -513,18 +513,6 @@ stabilize 1 2
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 receiving messages
1->2 MsgApp Term:8 Log:8/21 Commit:18
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
2->1 MsgAppResp Term:8 Log:0/21

stabilize 1 3
----
Expand Down Expand Up @@ -579,18 +567,6 @@ stabilize 1 3
3->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
3->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:8 Log:8/21 Commit:18
> 3 receiving messages
1->3 MsgApp Term:8 Log:8/21 Commit:18
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
3->1 MsgAppResp Term:8 Log:0/21

stabilize 1 4
----
Expand Down Expand Up @@ -674,18 +650,6 @@ stabilize 1 5
5->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
5->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->5 MsgApp Term:8 Log:8/21 Commit:21
> 5 receiving messages
1->5 MsgApp Term:8 Log:8/21 Commit:21
> 5 handling Ready
Ready MustSync=false:
Messages:
5->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
5->1 MsgAppResp Term:8 Log:0/21

stabilize 1 6
----
Expand Down Expand Up @@ -741,18 +705,6 @@ stabilize 1 6
6->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
6->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->6 MsgApp Term:8 Log:8/21 Commit:21
> 6 receiving messages
1->6 MsgApp Term:8 Log:8/21 Commit:21
> 6 handling Ready
Ready MustSync=false:
Messages:
6->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
6->1 MsgAppResp Term:8 Log:0/21

stabilize 1 7
----
Expand Down Expand Up @@ -816,15 +768,3 @@ stabilize 1 7
7->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
7->1 MsgAppResp Term:8 Log:0/21
> 1 handling Ready
Ready MustSync=false:
Messages:
1->7 MsgApp Term:8 Log:8/21 Commit:21
> 7 receiving messages
1->7 MsgApp Term:8 Log:8/21 Commit:21
> 7 handling Ready
Ready MustSync=false:
Messages:
7->1 MsgAppResp Term:8 Log:0/21
> 1 receiving messages
7->1 MsgAppResp Term:8 Log:0/21
15 changes: 2 additions & 13 deletions testdata/snapshot_succeed_via_app_resp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ stabilize 1
> 1 receiving messages
3->1 MsgHeartbeatResp Term:1 Log:0/0
DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=12 paused pendingSnap=11]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand All @@ -98,7 +98,7 @@ status 1
----
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateSnapshot match=0 next=11 paused pendingSnap=11
3: StateSnapshot match=0 next=12 paused pendingSnap=11

# Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion.
# The snapshot fully catches the follower up (i.e. there are no more log entries it
Expand Down Expand Up @@ -127,10 +127,6 @@ stabilize 1
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/11 Commit:11

status 1
----
Expand All @@ -143,16 +139,9 @@ stabilize
----
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/11 Commit:11
> 2 handling Ready
Ready MustSync=false:
Messages:
2->1 MsgHeartbeatResp Term:1 Log:0/0
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:1 Log:0/11
> 1 receiving messages
2->1 MsgHeartbeatResp Term:1 Log:0/0
3->1 MsgAppResp Term:1 Log:0/11
4 changes: 2 additions & 2 deletions testdata/snapshot_succeed_via_app_resp_behind.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ stabilize 1
DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6]
DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=13 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down Expand Up @@ -152,7 +152,7 @@ stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=12]
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=13 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
Expand Down
Loading

0 comments on commit bc285dd

Please sign in to comment.