From 0c22de09081964a3d24c171279d3c8d8be721cf0 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 14 Mar 2023 12:56:36 +0000 Subject: [PATCH] raft: don't panic when looking for term conflicts This commit fixes a hypothetical panic that may occur when a stale MsgApp message arrives to a follower. The conflict searching algorithm in findConflictByTerm may return a log index which is not present in the log, and thus the raftLog.term() method may return an error. It is safe to ignore this error and send MsgAppResp with the found index and a zero LogTerm. This commit also restores the behaviour that existed before commit d0fb0cd6. Back then, the term() function would silently return 0 instead of an error, and a zero LogTerm would be sent with the rejecting MsgAppResp. After that commit, there is a new possible panic. We remove the possibility of this panic here. Signed-off-by: Pavel Kalinnikov --- log.go | 47 ++++++++++++++--------------- log_test.go | 7 ++++- raft.go | 27 +++++++++-------- raft_test.go | 36 ++++++++++++++++++++-- testdata/probe_after_compaction.txt | 0 5 files changed, 77 insertions(+), 40 deletions(-) delete mode 100644 testdata/probe_after_compaction.txt diff --git a/log.go b/log.go index 5e7cefc3..db22740b 100644 --- a/log.go +++ b/log.go @@ -162,34 +162,31 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { return 0 } -// findConflictByTerm takes an (index, term) pair (indicating a conflicting log -// entry on a leader/follower during an append) and finds the largest index in -// log l with a term <= `term` and an index <= `index`. If no such index exists -// in the log, the log's first index is returned. +// findConflictByTerm returns a best guess on where this log ends matching +// another log, given that the only information known about the other log is the +// (index, term) of its single entry. // -// The index provided MUST be equal to or less than l.lastIndex(). Invalid -// inputs log a warning and the input index is returned. -func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 { - if li := l.lastIndex(); index > li { - // NB: such calls should not exist, but since there is a straightfoward - // way to recover, do it. - // - // It is tempting to also check something about the first index, but - // there is odd behavior with peers that have no log, in which case - // lastIndex will return zero and firstIndex will return one, which - // leads to calls with an index of zero into this method. - l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm", - index, li) - return index - } - for { - logTerm, err := l.term(index) - if logTerm <= term || err != nil { - break +// Specifically, the first returned value is the max guessIndex <= index, such +// that term(guessIndex) <= term or term(guessIndex) is not known (because this +// index is compacted or not yet stored). +// +// The second returned value is the term(guessIndex), or 0 if it is unknown. +// +// This function is used by a follower and leader to resolve log conflicts after +// an unsuccessful append to a follower, and ultimately restore the steady flow +// of appends. +func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) { + for ; index > 0; index-- { + // If there is an error (likely ErrCompacted or ErrUnavailable), we don't + // know whether it's a match or not, so assume a possible match and return + // the index, with 0 term indicating an unknown term. + if ourTerm, err := l.term(index); err != nil { + return index, 0 + } else if ourTerm <= term { + return index, ourTerm } - index-- } - return index + return 0, 0 } // nextUnstableEnts returns all entries that are available to be written to the diff --git a/log_test.go b/log_test.go index 47ed4a45..14f4e9bb 100644 --- a/log_test.go +++ b/log_test.go @@ -102,7 +102,12 @@ func TestFindConflictByTerm(t *testing.T) { }}) l := newLog(st, raftLogger) l.append(tt.ents[1:]...) - require.Equal(t, tt.want, l.findConflictByTerm(tt.index, tt.term)) + + index, term := l.findConflictByTerm(tt.index, tt.term) + require.Equal(t, tt.want, index) + wantTerm, err := l.term(index) + wantTerm = l.zeroTermOnOutOfBounds(wantTerm, err) + require.Equal(t, wantTerm, term) }) } } diff --git a/raft.go b/raft.go index ccdc0588..d1048294 100644 --- a/raft.go +++ b/raft.go @@ -1394,7 +1394,7 @@ func stepLeader(r *raft, m pb.Message) error { // 7, the rejection points it at the end of the follower's log // which is at a higher log term than the actually committed // log. - nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) + nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) } if pr.MaybeDecrTo(m.Index, nextProbeIdx) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) @@ -1652,24 +1652,27 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } - r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) - // Return a hint to the leader about the maximum index and term that the two - // logs could be divergent at. Do this by searching through the follower's log - // for the maximum (index, term) pair with a term <= the MsgApp's LogTerm and - // an index <= the MsgApp's Index. This can help skip all indexes in the - // follower's uncommitted tail with terms greater than the MsgApp's LogTerm. + // Our log does not match the leader's at index m.Index. Return a hint to the + // leader - a guess on the maximal (index, term) at which the logs match. Do + // this by searching through the follower's log for the maximum (index, term) + // pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's + // Index. This can help skip all indexes in the follower's uncommitted tail + // with terms greater than the MsgApp's LogTerm. // // See the other caller for findConflictByTerm (in stepLeader) for a much more // detailed explanation of this mechanism. + + // NB: m.Index >= raftLog.committed by now (see the early return above), and + // raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is + // also >= raftLog.committed. Hence, the findConflictByTerm argument is within + // the valid interval, which then will return a valid (index, term) pair with + // a non-zero term (unless the log is empty). However, it is safe to send a zero + // LogTerm in this response in any case, so we don't verify it here. hintIndex := min(m.Index, r.raftLog.lastIndex()) - hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) - hintTerm, err := r.raftLog.term(hintIndex) - if err != nil { - panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) - } + hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) r.send(pb.Message{ To: m.From, Type: pb.MsgAppResp, diff --git a/raft_test.go b/raft_test.go index 1c39c5db..5637c4a4 100644 --- a/raft_test.go +++ b/raft_test.go @@ -4494,6 +4494,7 @@ func TestFastLogRejection(t *testing.T) { tests := []struct { leaderLog []pb.Entry // Logs on the leader followerLog []pb.Entry // Logs on the follower + followerCompact uint64 // Index at which the follower log is compacted. rejectHintTerm uint64 // Expected term included in rejected MsgAppResp. rejectHintIndex uint64 // Expected index included in rejected MsgAppResp. nextAppendTerm uint64 // Expected term when leader appends after rejected. @@ -4698,10 +4699,35 @@ func TestFastLogRejection(t *testing.T) { {Term: 4, Index: 7}, {Term: 4, Index: 8}, }, - nextAppendTerm: 2, - nextAppendIndex: 1, rejectHintTerm: 2, rejectHintIndex: 1, + nextAppendTerm: 2, + nextAppendIndex: 1, + }, + // A case when a stale MsgApp from leader arrives after the corresponding + // log index got compacted. + // A stale (type=MsgApp,index=3,logTerm=3,entries=[(term=3,index=4)]) is + // delivered to a follower who has already compacted beyond log index 3. The + // MsgAppResp rejection will return same index=3, with logTerm=0. The leader + // will rollback by one entry, and send MsgApp with index=2,logTerm=1. + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + {Term: 3, Index: 4}, + {Term: 3, Index: 5}, // <- this entry and below are compacted + }, + followerCompact: 5, + rejectHintTerm: 0, + rejectHintIndex: 3, + nextAppendTerm: 1, + nextAppendIndex: 2, }, } @@ -4728,6 +4754,12 @@ func TestFastLogRejection(t *testing.T) { Commit: 0, }) n2 := newTestRaft(2, 10, 1, s2) + if test.followerCompact != 0 { + s2.Compact(test.followerCompact) + // NB: the state of n2 after this compaction isn't realistic because the + // commit index is still at 0. We do this to exercise a "doesn't happen" + // edge case behaviour, in case it still does happen in some other way. + } require.NoError(t, n2.Step(pb.Message{From: 1, To: 2, Type: pb.MsgHeartbeat})) msgs := n2.readMessages() diff --git a/testdata/probe_after_compaction.txt b/testdata/probe_after_compaction.txt deleted file mode 100644 index e69de29b..00000000