diff --git a/log.go b/log.go index 5e7cefc3..db22740b 100644 --- a/log.go +++ b/log.go @@ -162,34 +162,31 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { return 0 } -// findConflictByTerm takes an (index, term) pair (indicating a conflicting log -// entry on a leader/follower during an append) and finds the largest index in -// log l with a term <= `term` and an index <= `index`. If no such index exists -// in the log, the log's first index is returned. +// findConflictByTerm returns a best guess on where this log ends matching +// another log, given that the only information known about the other log is the +// (index, term) of its single entry. // -// The index provided MUST be equal to or less than l.lastIndex(). Invalid -// inputs log a warning and the input index is returned. -func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 { - if li := l.lastIndex(); index > li { - // NB: such calls should not exist, but since there is a straightfoward - // way to recover, do it. - // - // It is tempting to also check something about the first index, but - // there is odd behavior with peers that have no log, in which case - // lastIndex will return zero and firstIndex will return one, which - // leads to calls with an index of zero into this method. - l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm", - index, li) - return index - } - for { - logTerm, err := l.term(index) - if logTerm <= term || err != nil { - break +// Specifically, the first returned value is the max guessIndex <= index, such +// that term(guessIndex) <= term or term(guessIndex) is not known (because this +// index is compacted or not yet stored). +// +// The second returned value is the term(guessIndex), or 0 if it is unknown. +// +// This function is used by a follower and leader to resolve log conflicts after +// an unsuccessful append to a follower, and ultimately restore the steady flow +// of appends. +func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) { + for ; index > 0; index-- { + // If there is an error (likely ErrCompacted or ErrUnavailable), we don't + // know whether it's a match or not, so assume a possible match and return + // the index, with 0 term indicating an unknown term. + if ourTerm, err := l.term(index); err != nil { + return index, 0 + } else if ourTerm <= term { + return index, ourTerm } - index-- } - return index + return 0, 0 } // nextUnstableEnts returns all entries that are available to be written to the diff --git a/log_test.go b/log_test.go index 47ed4a45..14f4e9bb 100644 --- a/log_test.go +++ b/log_test.go @@ -102,7 +102,12 @@ func TestFindConflictByTerm(t *testing.T) { }}) l := newLog(st, raftLogger) l.append(tt.ents[1:]...) - require.Equal(t, tt.want, l.findConflictByTerm(tt.index, tt.term)) + + index, term := l.findConflictByTerm(tt.index, tt.term) + require.Equal(t, tt.want, index) + wantTerm, err := l.term(index) + wantTerm = l.zeroTermOnOutOfBounds(wantTerm, err) + require.Equal(t, wantTerm, term) }) } } diff --git a/raft.go b/raft.go index ccdc0588..d1048294 100644 --- a/raft.go +++ b/raft.go @@ -1394,7 +1394,7 @@ func stepLeader(r *raft, m pb.Message) error { // 7, the rejection points it at the end of the follower's log // which is at a higher log term than the actually committed // log. - nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) + nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) } if pr.MaybeDecrTo(m.Index, nextProbeIdx) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) @@ -1652,24 +1652,27 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } - r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) - // Return a hint to the leader about the maximum index and term that the two - // logs could be divergent at. Do this by searching through the follower's log - // for the maximum (index, term) pair with a term <= the MsgApp's LogTerm and - // an index <= the MsgApp's Index. This can help skip all indexes in the - // follower's uncommitted tail with terms greater than the MsgApp's LogTerm. + // Our log does not match the leader's at index m.Index. Return a hint to the + // leader - a guess on the maximal (index, term) at which the logs match. Do + // this by searching through the follower's log for the maximum (index, term) + // pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's + // Index. This can help skip all indexes in the follower's uncommitted tail + // with terms greater than the MsgApp's LogTerm. // // See the other caller for findConflictByTerm (in stepLeader) for a much more // detailed explanation of this mechanism. + + // NB: m.Index >= raftLog.committed by now (see the early return above), and + // raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is + // also >= raftLog.committed. Hence, the findConflictByTerm argument is within + // the valid interval, which then will return a valid (index, term) pair with + // a non-zero term (unless the log is empty). However, it is safe to send a zero + // LogTerm in this response in any case, so we don't verify it here. hintIndex := min(m.Index, r.raftLog.lastIndex()) - hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) - hintTerm, err := r.raftLog.term(hintIndex) - if err != nil { - panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) - } + hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) r.send(pb.Message{ To: m.From, Type: pb.MsgAppResp, diff --git a/raft_test.go b/raft_test.go index 1c39c5db..5637c4a4 100644 --- a/raft_test.go +++ b/raft_test.go @@ -4494,6 +4494,7 @@ func TestFastLogRejection(t *testing.T) { tests := []struct { leaderLog []pb.Entry // Logs on the leader followerLog []pb.Entry // Logs on the follower + followerCompact uint64 // Index at which the follower log is compacted. rejectHintTerm uint64 // Expected term included in rejected MsgAppResp. rejectHintIndex uint64 // Expected index included in rejected MsgAppResp. nextAppendTerm uint64 // Expected term when leader appends after rejected. @@ -4698,10 +4699,35 @@ func TestFastLogRejection(t *testing.T) { {Term: 4, Index: 7}, {Term: 4, Index: 8}, }, - nextAppendTerm: 2, - nextAppendIndex: 1, rejectHintTerm: 2, rejectHintIndex: 1, + nextAppendTerm: 2, + nextAppendIndex: 1, + }, + // A case when a stale MsgApp from leader arrives after the corresponding + // log index got compacted. + // A stale (type=MsgApp,index=3,logTerm=3,entries=[(term=3,index=4)]) is + // delivered to a follower who has already compacted beyond log index 3. The + // MsgAppResp rejection will return same index=3, with logTerm=0. The leader + // will rollback by one entry, and send MsgApp with index=2,logTerm=1. + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + {Term: 3, Index: 4}, + {Term: 3, Index: 5}, // <- this entry and below are compacted + }, + followerCompact: 5, + rejectHintTerm: 0, + rejectHintIndex: 3, + nextAppendTerm: 1, + nextAppendIndex: 2, }, } @@ -4728,6 +4754,12 @@ func TestFastLogRejection(t *testing.T) { Commit: 0, }) n2 := newTestRaft(2, 10, 1, s2) + if test.followerCompact != 0 { + s2.Compact(test.followerCompact) + // NB: the state of n2 after this compaction isn't realistic because the + // commit index is still at 0. We do this to exercise a "doesn't happen" + // edge case behaviour, in case it still does happen in some other way. + } require.NoError(t, n2.Step(pb.Message{From: 1, To: 2, Type: pb.MsgHeartbeat})) msgs := n2.readMessages() diff --git a/testdata/probe_after_compaction.txt b/testdata/probe_after_compaction.txt deleted file mode 100644 index e69de29b..00000000