diff --git a/raft.go b/raft.go index 043fdc92..f96a4abf 100644 --- a/raft.go +++ b/raft.go @@ -1522,7 +1522,7 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) r.maybeSendAppend(m.From, pr) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { @@ -1556,7 +1556,7 @@ func stepLeader(r *raft, m pb.Message) error { // If snapshot finish, wait for the MsgAppResp from the remote node before sending // out the next MsgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.MsgAppFlowPaused = true + pr.PauseMsgAppProbes(true) case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. @@ -1593,7 +1593,7 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) r.maybeSendAppend(leadTransferee, pr) } } diff --git a/raft_snap_test.go b/raft_snap_test.go index e6058c68..a69d1993 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) { if sm.trk.Progress[2].Next != 1 { t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) { if sm.trk.Progress[2].Next != 12 { t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } diff --git a/raft_test.go b/raft_test.go index c25bd14d..c41c484e 100644 --- a/raft_test.go +++ b/raft_test.go @@ -128,21 +128,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].PauseMsgAppProbes(true) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } r.trk.Progress[2].BecomeReplicate() - if r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused) + if r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppProbesPaused) } - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].PauseMsgAppProbes(true) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } } @@ -2784,8 +2784,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2799,8 +2799,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } // consume the heartbeat @@ -2822,8 +2822,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } } diff --git a/tracker/progress.go b/tracker/progress.go index 0190c352..b06f5b38 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -87,13 +87,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This - // happens in StateProbe, or StateReplicate with saturated Inflights. In both - // cases, we need to continue sending MsgApp once in a while to guarantee - // progress, but we only do so when MsgAppFlowPaused is false (it is reset on - // receiving a heartbeat response), to not overflow the receiver. See - // IsPaused(). - MsgAppFlowPaused bool + // MsgAppProbesPaused set to true prevents sending "probe" MsgApp messages to + // this follower. Used in StateProbe, or StateReplicate when all entries are + // in-flight or the in-flight volume exceeds limits. See ShouldSendMsgApp(). + // + // TODO(pav-kv): unexport this field. It is used by a few tests, but should be + // replaced by PauseMsgAppProbes() and ShouldSendMsgApp(). + MsgAppProbesPaused bool // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. @@ -113,7 +113,7 @@ type Progress struct { IsLearner bool } -// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, +// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { pr.PauseMsgAppProbes(false) @@ -169,7 +169,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) { // PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on // the passed-in bool. func (pr *Progress) PauseMsgAppProbes(pause bool) { - pr.MsgAppFlowPaused = pause + pr.MsgAppProbesPaused = pause } // CanSendEntries returns true if the flow control state allows sending at least @@ -250,12 +250,17 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of // log entries again. +// +// TODO(pav-kv): this method is deprecated, remove it. It is still used in tests +// and String(), find a way to avoid this. The problem is that the actual flow +// control state depends on the log size and commit index, which are not part of +// this Progress struct - they are passed-in to methods like ShouldSendMsgApp(). func (pr *Progress) IsPaused() bool { switch pr.State { case StateProbe: - return pr.MsgAppFlowPaused + return pr.MsgAppProbesPaused case StateReplicate: - return pr.MsgAppFlowPaused && pr.Inflights.Full() + return pr.MsgAppProbesPaused && pr.Inflights.Full() case StateSnapshot: return true default: @@ -285,10 +290,10 @@ func (pr *Progress) IsPaused() bool { func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool { switch pr.State { case StateProbe: - return !pr.MsgAppFlowPaused + return !pr.MsgAppProbesPaused case StateReplicate: return pr.CanBumpCommit(commit) || - pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last)) + pr.Match < last && (!pr.MsgAppProbesPaused || pr.CanSendEntries(last)) case StateSnapshot: return false default: diff --git a/tracker/progress_test.go b/tracker/progress_test.go index 5ceaa59f..c9485c21 100644 --- a/tracker/progress_test.go +++ b/tracker/progress_test.go @@ -24,14 +24,14 @@ func TestProgressString(t *testing.T) { ins := NewInflights(1, 0) ins.Add(123, 1) pr := &Progress{ - Match: 1, - Next: 2, - State: StateSnapshot, - PendingSnapshot: 123, - RecentActive: false, - MsgAppFlowPaused: true, - IsLearner: true, - Inflights: ins, + Match: 1, + Next: 2, + State: StateSnapshot, + PendingSnapshot: 123, + RecentActive: false, + MsgAppProbesPaused: true, + IsLearner: true, + Inflights: ins, } const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]` assert.Equal(t, exp, pr.String()) @@ -53,29 +53,29 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, - MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256, 0), + State: tt.state, + MsgAppProbesPaused: tt.paused, + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } } -// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and +// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and // MaybeUpdate does not. // // TODO(pav-kv): there is little sense in testing these micro-behaviours in the // struct. We should test the visible behaviour instead. func TestProgressResume(t *testing.T) { p := &Progress{ - Next: 2, - MsgAppFlowPaused: true, + Next: 2, + MsgAppProbesPaused: true, } p.MaybeDecrTo(1, 1) - assert.False(t, p.MsgAppFlowPaused) - p.MsgAppFlowPaused = true + assert.False(t, p.MsgAppProbesPaused) + p.MsgAppProbesPaused = true p.MaybeUpdate(2) - assert.True(t, p.MsgAppFlowPaused) + assert.True(t, p.MsgAppProbesPaused) } func TestProgressBecomeProbe(t *testing.T) {