From 9cc9344a1171e61d796444f2783696e1b1ae1b24 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 25 Jan 2024 01:07:37 +0000 Subject: [PATCH] tmp: refactor maybeSendAppend Signed-off-by: Pavel Kalinnikov --- confchange/confchange.go | 3 +- raft.go | 209 ++++++++++-------- raft_test.go | 19 +- .../heartbeat_resp_recovers_from_probing.txt | 6 +- testdata/replicate_pause.txt | 7 +- testdata/slow_follower_after_compaction.txt | 10 +- testdata/snapshot_succeed_via_app_resp.txt | 8 +- .../snapshot_succeed_via_app_resp_behind.txt | 8 +- tracker/progress.go | 74 ++----- tracker/watermark.go | 32 +++ 10 files changed, 204 insertions(+), 172 deletions(-) create mode 100644 tracker/watermark.go diff --git a/confchange/confchange.go b/confchange/confchange.go index 55db16a8..c653ea94 100644 --- a/confchange/confchange.go +++ b/confchange/confchange.go @@ -259,8 +259,7 @@ func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id u // at all (and will thus likely need a snapshot), though the app may // have applied a snapshot out of band before adding the replica (thus // making the first index the better choice). - Next: c.LastIndex, - Match: 0, + Watermark: tracker.Watermark{Next: c.LastIndex}, Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes), IsLearner: isLearner, // When a node is first added, we should mark it as recently active. diff --git a/raft.go b/raft.go index 5e5ad93a..289eba1d 100644 --- a/raft.go +++ b/raft.go @@ -586,85 +586,120 @@ func (r *raft) send(m pb.Message) { } } -// sendAppend sends an append RPC with new entries (if any) and the -// current commit index to the given peer. -func (r *raft) sendAppend(to uint64) { - r.maybeSendAppend(to, true) -} - -// maybeSendAppend sends an append RPC with new entries to the given peer, -// if necessary. Returns true if a message was sent. The sendIfEmpty -// argument controls whether messages with no entries will be sent -// ("empty" messages are useful to convey updated Commit indexes, but -// are undesirable when we're sending multiple messages in a batch). -func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { - pr := r.trk.Progress[to] - if pr.IsPaused() { +// maybeSendAppend sends an append RPC with new entries and commit index to the +// given peer, if necessary. Returns true if a message was sent. +// +// May send an empty message, to convey an update Commit index. +func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { + if to == r.id { + return false + } + if st := pr.State; st != tracker.StateProbe && st != tracker.StateReplicate { + return false + } else if st == tracker.StateProbe && pr.MsgAppFlowPaused { return false } - lastIndex, nextIndex := pr.Next-1, pr.Next - lastTerm, errt := r.raftLog.term(lastIndex) - - var ents []pb.Entry - var erre error - // In a throttled StateReplicate only send empty MsgApp, to ensure progress. - // Otherwise, if we had a full Inflights and all inflight messages were in - // fact dropped, replication to that follower would stall. Instead, an empty - // MsgApp will eventually reach the follower (heartbeats responses prompt the - // leader to send an append), allowing it to be acked or rejected, both of - // which will clear out Inflights. - if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize) + prevTerm, err := r.raftLog.term(pr.Next - 1) + if err != nil { + // The log probably got truncated at >= pr.Next, so we can't catch up the + // follower log anymore. Send a snapshot instead. + return r.sendSnapshot(to, pr) + } + if pr.State == tracker.StateProbe { // !pr.MsgAppFlowPaused + pr.MsgAppFlowPaused = true + return r.sendAppend(to, pr, prevTerm) } + // StateReplicate + repl := (*tracker.ProgressReplicate)(pr) - if len(ents) == 0 && !sendIfEmpty { + lastIndex, commit := r.raftLog.lastIndex(), r.raftLog.committed + if repl.UpToDate(lastIndex, commit) { return false } - - if errt != nil || erre != nil { // send snapshot if we failed to get term or entries - if !pr.RecentActive { - r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) - return false - } - - snapshot, err := r.raftLog.snapshot() - if err != nil { - if err == ErrSnapshotTemporarilyUnavailable { - r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) - return false - } - panic(err) // TODO(bdarnell) - } - if IsEmptySnap(snapshot) { - panic("need non-empty snapshot") - } - sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term - r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", - r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) - pr.BecomeSnapshot(sindex) - r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - - r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) - return true + if !repl.IsThrottled(lastIndex) { + return r.sendAppend(to, pr, prevTerm) + } + if repl.ShouldSendCommit(commit) { + return r.sendMsgAppPing(to, pr, prevTerm) + } + // In a throttled StateReplicate only send empty MsgApp, to ensure progress. + // Otherwise, if all the inflight messages are dropped, replication to that + // follower stalls. We send an empty MsgApp periodically, so that eventually + // it reaches the follower, and the latter acks or rejects it. The MsgAppFlowPaused flag + // is reset by a HeartbeatResp message, which is guaranteed if the follower is + // connected. + // + // Also, ensure sending an empty MsgApp if the follower's commit index can be + // moved forward. + if !repl.MsgAppFlowPaused { + return r.sendMsgAppPing(to, pr, prevTerm) } + return false +} + +func (r *raft) sendMsgAppPing(to uint64, pr *tracker.Progress, prevTerm uint64) bool { + commit := r.raftLog.committed + r.send(pb.Message{ + To: to, + Type: pb.MsgApp, + Index: pr.Next - 1, + LogTerm: prevTerm, + Commit: commit, + }) + pr.MsgAppFlowPaused = true + pr.Commit.Sent(min(commit, pr.Next-1)) + return true +} - // Send the actual MsgApp otherwise, and update the progress accordingly. - if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil { - r.logger.Panicf("%x: %v", r.id, err) +func (r *raft) sendAppend(to uint64, pr *tracker.Progress, prevTerm uint64) bool { + entries, err := r.raftLog.entries(pr.Next, r.maxMsgSize) + if err != nil { // send snapshot if we failed to get entries + return r.sendSnapshot(to, pr) } - // NB: pr has been updated, but we make sure to only use its old values below. + prevIndex := pr.Next - 1 + pr.UpdateOnEntriesSend(len(entries), uint64(payloadsSize(entries))) + commit := r.raftLog.committed + pr.MsgAppFlowPaused = true + pr.Commit.Sent(min(commit, pr.Next-1)) r.send(pb.Message{ To: to, Type: pb.MsgApp, - Index: lastIndex, - LogTerm: lastTerm, - Entries: ents, - Commit: r.raftLog.committed, + Index: prevIndex, + LogTerm: prevTerm, + Entries: entries, + Commit: commit, }) return true } +func (r *raft) sendSnapshot(to uint64, pr *tracker.Progress) bool { + if !pr.RecentActive { + r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) + return false + } + + snapshot, err := r.raftLog.snapshot() + if err != nil { + if err == ErrSnapshotTemporarilyUnavailable { + r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) + return false + } + panic(err) // TODO(bdarnell) + } + if IsEmptySnap(snapshot) { + panic("need non-empty snapshot") + } + sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term + r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) + pr.BecomeSnapshot(sindex) + r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) + + r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) + return true +} + // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). @@ -687,11 +722,8 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.trk. func (r *raft) bcastAppend() { - r.trk.Visit(func(id uint64, _ *tracker.Progress) { - if id == r.id { - return - } - r.sendAppend(id) + r.trk.Visit(func(id uint64, pr *tracker.Progress) { + r.maybeSendAppend(id, pr) }) } @@ -773,8 +805,7 @@ func (r *raft) reset(term uint64) { r.trk.ResetVotes() r.trk.Visit(func(id uint64, pr *tracker.Progress) { *pr = tracker.Progress{ - Match: 0, - Next: r.raftLog.lastIndex() + 1, + Watermark: tracker.Watermark{Next: r.raftLog.lastIndex() + 1}, Inflights: tracker.NewInflights(r.trk.MaxInflight, r.trk.MaxInflightBytes), IsLearner: pr.IsLearner, } @@ -1462,11 +1493,10 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == tracker.StateReplicate { pr.BecomeProbe() } - r.sendAppend(m.From) + r.maybeSendAppend(m.From, pr) } } else { - oldPaused := pr.IsPaused() - pr.UpdateCommit(m.Commit) + pr.Commit.Update(m.Commit) // We want to update our tracking if the response updates our // matched index or if the response can move a probing peer back // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt @@ -1503,9 +1533,6 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if oldPaused && r.id != m.From && pr.Commit < r.raftLog.committed { - // The node is potentially missing the latest commit index. Send it. - r.sendAppend(m.From) } // We've updated flow control information above, which may // allow us to send multiple (size-limited) in-flight messages @@ -1514,7 +1541,7 @@ func stepLeader(r *raft, m pb.Message) error { // we have more entries to send, send as many messages as we // can (without sending empty messages for the commit index) if r.id != m.From { - for r.maybeSendAppend(m.From, false /* sendIfEmpty */) { + for r.maybeSendAppend(m.From, pr) { } } // Transfer leadership is in progress. @@ -1541,9 +1568,7 @@ func stepLeader(r *raft, m pb.Message) error { // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a // no-op. - if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe { - r.sendAppend(m.From) - } + r.maybeSendAppend(m.From, pr) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil @@ -1613,7 +1638,7 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - r.sendAppend(leadTransferee) + r.maybeSendAppend(leadTransferee, pr) } } return nil @@ -1947,21 +1972,15 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co return cs } - if r.maybeCommit() { - // If the configuration change means that more entries are committed now, - // broadcast/append to everyone in the updated config. - r.bcastAppend() - } else { - // Otherwise, still probe the newly added replicas; there's no reason to - // let them wait out a heartbeat interval (or the next incoming - // proposal). - r.trk.Visit(func(id uint64, pr *tracker.Progress) { - if id == r.id { - return - } - r.maybeSendAppend(id, false /* sendIfEmpty */) - }) - } + r.maybeCommit() + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. + // + // Otherwise, still probe the newly added replicas; there's no reason to + // let them wait out a heartbeat interval (or the next incoming + // proposal). + r.bcastAppend() + // If the leadTransferee was removed or demoted, abort the leadership transfer. if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..a8f5b6f0 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2794,7 +2794,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeProbe() + pr2 := r.trk.Progress[2] + pr2.BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2803,7 +2804,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) @@ -2818,7 +2819,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) } @@ -2861,11 +2862,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeReplicate() + pr2 := r.trk.Progress[2] + pr2.BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 1 { t.Errorf("len(msg) = %d, want %d", len(msgs), 1) @@ -2878,11 +2880,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeSnapshot(10) + pr2 := r.trk.Progress[2] + pr2.BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 0 { t.Errorf("len(msg) = %d, want %d", len(msgs), 0) @@ -2901,7 +2904,7 @@ func TestRecvMsgUnreachable(t *testing.T) { // set node 2 to state replicate r.trk.Progress[2].Match = 3 r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[2].OptimisticUpdate(5) + r.trk.Progress[2].Update(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) diff --git a/testdata/heartbeat_resp_recovers_from_probing.txt b/testdata/heartbeat_resp_recovers_from_probing.txt index e94a67a1..f8f38945 100644 --- a/testdata/heartbeat_resp_recovers_from_probing.txt +++ b/testdata/heartbeat_resp_recovers_from_probing.txt @@ -29,8 +29,8 @@ ok status 1 ---- 1: StateReplicate match=11 commit=10 next=12 -2: StateReplicate match=11 commit=11 next=12 -3: StateReplicate match=11 commit=11 next=12 +2: StateReplicate match=11 commit=11 next=12 paused +3: StateReplicate match=11 commit=11 next=12 paused # On the first replica, report the second one as not reachable. report-unreachable 1 2 @@ -41,7 +41,7 @@ status 1 ---- 1: StateReplicate match=11 commit=10 next=12 2: StateProbe match=11 commit=11 next=12 -3: StateReplicate match=11 commit=11 next=12 +3: StateReplicate match=11 commit=11 next=12 paused tick-heartbeat 1 ---- diff --git a/testdata/replicate_pause.txt b/testdata/replicate_pause.txt index b9a359bf..3e64bd0a 100644 --- a/testdata/replicate_pause.txt +++ b/testdata/replicate_pause.txt @@ -67,7 +67,7 @@ ok status 1 ---- 1: StateReplicate match=14 commit=11 next=15 -2: StateReplicate match=14 commit=14 next=15 +2: StateReplicate match=14 commit=14 next=15 paused 3: StateReplicate match=11 commit=11 next=15 paused inflight=3[full] # Drop append messages to node 3. @@ -76,6 +76,9 @@ deliver-msgs drop=3 dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 # Repeat committing 3 entries. @@ -115,7 +118,7 @@ ok status 1 ---- 1: StateReplicate match=17 commit=14 next=18 -2: StateReplicate match=17 commit=17 next=18 +2: StateReplicate match=17 commit=17 next=18 paused 3: StateReplicate match=11 commit=11 next=15 paused inflight=3[full] # Make a heartbeat roundtrip. diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index d23820ae..a195df4a 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -44,8 +44,8 @@ ok status 1 ---- 1: StateReplicate match=14 commit=11 next=15 -2: StateReplicate match=14 commit=14 next=15 -3: StateReplicate match=14 commit=14 next=15 +2: StateReplicate match=14 commit=14 next=15 paused +3: StateReplicate match=14 commit=14 next=15 paused log-level none ---- @@ -80,7 +80,7 @@ ok status 1 ---- 1: StateReplicate match=18 commit=14 next=19 -2: StateReplicate match=18 commit=18 next=19 +2: StateReplicate match=18 commit=18 next=19 paused 3: StateReplicate match=14 commit=14 next=17 paused inflight=2[full] # Break the MsgApp flow from the leader to node 3. @@ -88,6 +88,8 @@ deliver-msgs drop=3 ---- dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15 +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16 # Truncate the leader's log beyond node 3 log size. compact 1 17 @@ -118,4 +120,4 @@ status 1 ---- 1: StateReplicate match=18 commit=14 next=19 2: StateReplicate match=18 commit=18 next=19 -3: StateReplicate match=18 commit=18 next=19 +3: StateReplicate match=18 commit=18 next=19 paused diff --git a/testdata/snapshot_succeed_via_app_resp.txt b/testdata/snapshot_succeed_via_app_resp.txt index 59ddd2e0..e938eecf 100644 --- a/testdata/snapshot_succeed_via_app_resp.txt +++ b/testdata/snapshot_succeed_via_app_resp.txt @@ -42,7 +42,7 @@ ok status 1 ---- 1: StateReplicate match=11 commit=10 next=12 -2: StateReplicate match=11 commit=11 next=12 +2: StateReplicate match=11 commit=11 next=12 paused 3: StateProbe match=0 commit=0 next=11 paused inactive # Add the node that will receive a snapshot (it has no state at all, does not @@ -96,7 +96,7 @@ stabilize 1 status 1 ---- 1: StateReplicate match=11 commit=10 next=12 -2: StateReplicate match=11 commit=11 next=12 +2: StateReplicate match=11 commit=11 next=12 paused 3: StateSnapshot match=0 commit=0 next=11 paused pendingSnap=11 # Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion. @@ -133,8 +133,8 @@ stabilize 1 status 1 ---- 1: StateReplicate match=11 commit=10 next=12 -2: StateReplicate match=11 commit=11 next=12 -3: StateReplicate match=11 commit=0 next=12 +2: StateReplicate match=11 commit=11 next=12 paused +3: StateReplicate match=11 commit=0 next=12 paused # Let things settle. stabilize diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt index ccf5ee5a..2052746d 100644 --- a/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -49,7 +49,7 @@ raft-state status 1 ---- 1: StateReplicate match=11 commit=10 next=12 -2: StateReplicate match=11 commit=11 next=12 +2: StateReplicate match=11 commit=11 next=12 paused 3: StateProbe match=0 commit=0 next=11 paused inactive raft-log 3 @@ -83,7 +83,7 @@ ok status 1 ---- 1: StateReplicate match=12 commit=11 next=13 -2: StateReplicate match=12 commit=12 next=13 +2: StateReplicate match=12 commit=12 next=13 paused 3: StateProbe match=0 commit=0 next=11 paused inactive # 3 now gets the first MsgApp the leader originally sent, trying to append entry @@ -159,5 +159,5 @@ stabilize 1 status 1 ---- 1: StateReplicate match=12 commit=11 next=13 -2: StateReplicate match=12 commit=12 next=13 -3: StateReplicate match=11 commit=0 next=13 inflight=1 +2: StateReplicate match=12 commit=12 next=13 paused +3: StateReplicate match=11 commit=0 next=13 paused inflight=1 diff --git a/tracker/progress.go b/tracker/progress.go index 38985c80..e77886d8 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -28,15 +28,9 @@ import ( // strewn around `*raft.raft`. Additionally, some fields are only used when in a // certain State. All of this isn't ideal. type Progress struct { - // Match is the log index up to which the follower's log matches the leader's. - Match uint64 - // Commit is the commit index of the follower's log. - // INVARIANT: Commit <= Match. - Commit uint64 - // Next is the index of the next log entry to be sent to the follower. Entries - // in the [Match+1, Next-1] range, if any, are on the fly to the follower and - // have not been rejected yet. - Next uint64 + Watermark // the follower's last log index + + Commit Watermark // the follower's commit index // State defines how the leader should interact with the follower. // @@ -161,55 +155,21 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { // UpdateOnEntriesSend updates the progress on the given number of consecutive // entries being sent in a MsgApp, with the given total bytes size, appended at // and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { - switch pr.State { - case StateReplicate: - if entries > 0 { - last := nextIndex + uint64(entries) - 1 - pr.OptimisticUpdate(last) - pr.Inflights.Add(last, bytes) - } - // If this message overflows the in-flights tracker, or it was already full, - // consider this message being a probe, so that the flow is paused. - pr.MsgAppFlowPaused = pr.Inflights.Full() - case StateProbe: - // TODO(pavelkalinnikov): this condition captures the previous behaviour, - // but we should set MsgAppFlowPaused unconditionally for simplicity, because any - // MsgApp in StateProbe is a probe, not only non-empty ones. - if entries > 0 { - pr.MsgAppFlowPaused = true - } - default: - return fmt.Errorf("sending append in unhandled state %s", pr.State) - } - return nil -} - -// UpdateCommit moves the known commit index for this follower forward. -func (pr *Progress) UpdateCommit(index uint64) { - if index > pr.Commit { - pr.Commit = index +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { + if pr.State != StateReplicate { + return } + pr.Next += uint64(entries) + pr.Inflights.Add(pr.Next-1, bytes) } // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. func (pr *Progress) MaybeUpdate(n uint64) bool { - var updated bool - if pr.Match < n { - pr.Match = n - updated = true - pr.MsgAppFlowPaused = false - } - pr.Next = max(pr.Next, n+1) - return updated + return pr.Watermark.Update(n) } -// OptimisticUpdate signals that appends all the way up to and including index n -// are in-flight. As a result, Next is increased to n+1. -func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } - // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The // arguments are the index of the append message rejected by the follower, and // the hint that we want to decrease to. @@ -265,9 +225,23 @@ func (pr *Progress) IsPaused() bool { } } +type ProgressReplicate Progress + +func (pr *ProgressReplicate) IsThrottled(lastIndex uint64) bool { + return pr.Next > lastIndex || pr.Inflights.Full() +} + +func (pr *ProgressReplicate) ShouldSendCommit(committed uint64) bool { + return min(committed, pr.Next-1) >= pr.Commit.Next +} + +func (pr *ProgressReplicate) UpToDate(lastIndex, committed uint64) bool { + return pr.Match >= lastIndex && pr.Commit.Match >= committed +} + func (pr *Progress) String() string { var buf strings.Builder - fmt.Fprintf(&buf, "%s match=%d commit=%d next=%d", pr.State, pr.Match, pr.Commit, pr.Next) + fmt.Fprintf(&buf, "%s match=%d commit=%d next=%d", pr.State, pr.Match, pr.Commit.Match, pr.Next) if pr.IsLearner { fmt.Fprint(&buf, " learner") } diff --git a/tracker/watermark.go b/tracker/watermark.go new file mode 100644 index 00000000..8dc8a359 --- /dev/null +++ b/tracker/watermark.go @@ -0,0 +1,32 @@ +package tracker + +// Watermark is blah. Algorithm: +// +// - Send a bunch of stuff, Next will be updated. +// - Receive a bunch of stuff, Match will be updated. +// - Send < Next, MsgAppFlowPaused will be enabled. +// - Receive an update that bumps Match, MsgAppFlowPaused is disabled. +// +// Keep writing. +type Watermark struct { + // Match is the watermark up to which the follower matches the leader. + Match uint64 + // Next is the next leader watermark to send. All marks < Next are already + // in-flight to the follower. + Next uint64 +} + +func (w *Watermark) Update(match uint64) bool { + if match < w.Match { + return false + } + w.Match = match + w.Next = max(w.Next, match+1) + return true +} + +func (w *Watermark) Sent(watermark uint64) { + if watermark >= w.Next { + w.Next = watermark + 1 + } +}