Skip to content

Commit

Permalink
tracker: rename the paused probes flow field
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 23, 2024
1 parent 9ceae93 commit 2249aff
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 51 deletions.
6 changes: 3 additions & 3 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(m.From, pr)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
Expand Down Expand Up @@ -1556,7 +1556,7 @@ func stepLeader(r *raft, m pb.Message) error {
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.MsgAppFlowPaused = true
pr.PauseMsgAppProbes(true)
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
Expand Down Expand Up @@ -1593,7 +1593,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
pr.MsgAppFlowPaused = false
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(leadTransferee, pr)
}
}
Expand Down
8 changes: 4 additions & 4 deletions raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.trk.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand All @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.trk.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand Down
28 changes: 14 additions & 14 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

r.trk.Progress[2].MsgAppFlowPaused = true
r.trk.Progress[2].PauseMsgAppProbes(true)

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if !r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
if !r.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
}

r.trk.Progress[2].BecomeReplicate()
if r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
if r.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppProbesPaused)
}
r.trk.Progress[2].MsgAppFlowPaused = true
r.trk.Progress[2].PauseMsgAppProbes(true)
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if !r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
if !r.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand Down Expand Up @@ -2784,8 +2784,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
}

if !r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
if !r.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
Expand All @@ -2799,8 +2799,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
for j := 0; j < r.heartbeatTimeout; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
}
if !r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
if !r.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
}

// consume the heartbeat
Expand All @@ -2822,8 +2822,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
if msg[0].Index != 0 {
t.Errorf("index = %d, want %d", msg[0].Index, 0)
}
if !r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
if !r.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand Down
31 changes: 18 additions & 13 deletions tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ type Progress struct {
// This is always true on the leader.
RecentActive bool

// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
// cases, we need to continue sending MsgApp once in a while to guarantee
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
// receiving a heartbeat response), to not overflow the receiver. See
// IsPaused().
MsgAppFlowPaused bool
// MsgAppProbesPaused set to true prevents sending "probe" MsgApp messages to
// this follower. Used in StateProbe, or StateReplicate when all entries are
// in-flight or the in-flight volume exceeds limits. See ShouldSendMsgApp().
//
// TODO(pav-kv): unexport this field. It is used by a few tests, but should be
// replaced by PauseMsgAppProbes() and ShouldSendMsgApp().
MsgAppProbesPaused bool

// Inflights is a sliding window for the inflight messages.
// Each inflight message contains one or more log entries.
Expand All @@ -113,7 +113,7 @@ type Progress struct {
IsLearner bool
}

// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused,
// PendingSnapshot, and Inflights.
func (pr *Progress) ResetState(state StateType) {
pr.PauseMsgAppProbes(false)
Expand Down Expand Up @@ -169,7 +169,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
// PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on
// the passed-in bool.
func (pr *Progress) PauseMsgAppProbes(pause bool) {
pr.MsgAppFlowPaused = pause
pr.MsgAppProbesPaused = pause
}

// CanSendEntries returns true if the flow control state allows sending at least
Expand Down Expand Up @@ -250,12 +250,17 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
// operation, this is false. A throttled node will be contacted less frequently
// until it has reached a state in which it's able to accept a steady stream of
// log entries again.
//
// TODO(pav-kv): this method is deprecated, remove it. It is still used in tests
// and String(), find a way to avoid this. The problem is that the actual flow
// control state depends on the log size and commit index, which are not part of
// this Progress struct - they are passed-in to methods like ShouldSendMsgApp().
func (pr *Progress) IsPaused() bool {
switch pr.State {
case StateProbe:
return pr.MsgAppFlowPaused
return pr.MsgAppProbesPaused
case StateReplicate:
return pr.MsgAppFlowPaused && pr.Inflights.Full()
return pr.MsgAppProbesPaused && pr.Inflights.Full()
case StateSnapshot:
return true
default:
Expand Down Expand Up @@ -285,10 +290,10 @@ func (pr *Progress) IsPaused() bool {
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
switch pr.State {
case StateProbe:
return !pr.MsgAppFlowPaused
return !pr.MsgAppProbesPaused
case StateReplicate:
return pr.CanBumpCommit(commit) ||
pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last))
pr.Match < last && (!pr.MsgAppProbesPaused || pr.CanSendEntries(last))
case StateSnapshot:
return false
default:
Expand Down
34 changes: 17 additions & 17 deletions tracker/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ func TestProgressString(t *testing.T) {
ins := NewInflights(1, 0)
ins.Add(123, 1)
pr := &Progress{
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
MsgAppFlowPaused: true,
IsLearner: true,
Inflights: ins,
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
MsgAppProbesPaused: true,
IsLearner: true,
Inflights: ins,
}
const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]`
assert.Equal(t, exp, pr.String())
Expand All @@ -53,26 +53,26 @@ func TestProgressIsPaused(t *testing.T) {
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256, 0),
State: tt.state,
MsgAppProbesPaused: tt.paused,
Inflights: NewInflights(256, 0),
}
assert.Equal(t, tt.w, p.IsPaused(), i)
}
}

// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
// MsgAppFlowPaused.
// MsgAppProbesPaused.
func TestProgressResume(t *testing.T) {
p := &Progress{
Next: 2,
MsgAppFlowPaused: true,
Next: 2,
MsgAppProbesPaused: true,
}
p.MaybeDecrTo(1, 1)
assert.False(t, p.MsgAppFlowPaused)
p.MsgAppFlowPaused = true
assert.False(t, p.MsgAppProbesPaused)
p.MsgAppProbesPaused = true
p.MaybeUpdate(2)
assert.False(t, p.MsgAppFlowPaused)
assert.False(t, p.MsgAppProbesPaused)
}

func TestProgressBecomeProbe(t *testing.T) {
Expand Down

0 comments on commit 2249aff

Please sign in to comment.