From 87ac09e77c2185ab0adfa1f4d634e058a976c5d0 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 26 Jan 2024 02:22:07 +0000 Subject: [PATCH 1/2] raft: advance commit index safely This change makes the commit index advancement in handleHeartbeat safe. Previously, a follower would attempt to update the commit index to whichever was sent in the MsgHeartbeat message. Out-of-bound indices would crash the node. It is always safe to advance a commit index if the follower's log is "in sync" with the leader, i.e. when its log is guaranteed to be a prefix of the leader's log. This is always true if the term of last entry in the log matches the leader team, otherwise this guarantee is established when the first MsgApp append message from the leader succeeds. At the moment, the leader will never send a commit index that exceeds the follower's log size. However, this may change in future. This change is a defence-in-depth. The newly added raftLog.leaderTerm field will be used for other safety checks in the future, for example to establish that overriding a suffix of entries in raftLog is safe. Signed-off-by: Pavel Kalinnikov --- log.go | 43 +++++++++++++++++++++++++++++++++++++++++++ raft.go | 15 ++++++++++++++- raft_test.go | 18 +++++++++++++----- 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/log.go b/log.go index 84826c3a..db746979 100644 --- a/log.go +++ b/log.go @@ -29,6 +29,35 @@ type raftLog struct { // they will be saved into storage. unstable unstable + // leaderTerm is a term of the leader with whom our log is "consistent". The + // log is guaranteed to be a prefix of this term's leader log. + // + // The leaderTerm can be safely updated to `t` if: + // 1. the last entry in the log has term `t`, or, more generally, + // 2. the last successful append was sent by the leader `t`. + // + // This is due to the following safety property (see raft paper ยง5.3): + // + // Log Matching: if two logs contain an entry with the same index and term, + // then the logs are identical in all entries up through the given index. + // + // We use (1) to initialize leaderTerm, and (2) to maintain it on updates. + // + // NB: (2) does not imply (1). If our log is behind the leader's log, the last + // entry term can be below leaderTerm. + // + // NB: leaderTerm does not necessarily match this raft node's term. It only + // does for the leader. For followers and candidates, when we first learn or + // bump to a new term, we don't have a proof that our log is consistent with + // the new term's leader (current or prospective). The new leader may override + // any suffix of the log after the committed index. Only when the first append + // from the new leader succeeds, we can update leaderTerm. + // + // During normal operation, leaderTerm matches the node term though. During a + // leader change, it briefly lags behind, and matches again when the first + // append message succeeds. + leaderTerm uint64 + // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. committed uint64 @@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc if err != nil { panic(err) // TODO(bdarnell) } + lastTerm, err := storage.Term(lastIndex) + if err != nil { + panic(err) // TODO(pav-kv) + } + log.leaderTerm = lastTerm log.unstable.offset = lastIndex + 1 log.unstable.offsetInProgress = lastIndex + 1 log.unstable.logger = logger @@ -106,6 +140,15 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). +// +// TODO(pav-kv): pass in the term of the leader who sent this update. It is only +// safe to handle this append if this term is >= l.leaderTerm. It is only safe +// to override an uncommitted suffix of entries if term > l.leaderTerm. +// +// TODO(pav-kv): introduce a struct that consolidates the append metadata. The +// (prevEntryIndex, prevEntryTerm, leaderTerm) tuple must always be carried +// together, and safety of this append must be checked at the lowest layer here, +// rather than up in raft.go. func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { if !l.matchTerm(index, logTerm) { return 0, false diff --git a/raft.go b/raft.go index 5a150562..f24b6a98 100644 --- a/raft.go +++ b/raft.go @@ -935,6 +935,8 @@ func (r *raft) becomeLeader() { // so the preceding log append does not count against the uncommitted log // quota of the new leader. In other words, after the call to appendEntry, // r.uncommittedSize is still 0. + + r.raftLog.leaderTerm = r.Term // the leader's log is consistent with itself r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -1735,6 +1737,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1770,7 +1773,16 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) + // It is only safe to advance the commit index if our log is a prefix of the + // leader's log. Otherwise, entries at this index may mismatch. + // + // TODO(pav-kv): move this logic to r.raftLog, which is more appropriate for + // handling safety. The raftLog can use leaderTerm for other safety checks. + // For example, unstable.truncateAndAppend currently may override a suffix of + // the log unconditionally, but it can only be done if m.Term > leaderTerm. + if m.Term == r.raftLog.leaderTerm { + r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) + } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } @@ -1785,6 +1797,7 @@ func (r *raft) handleSnapshot(m pb.Message) { if r.restore(s) { r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, sindex, sterm) + r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..16e40608 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1332,16 +1332,24 @@ func TestHandleMsgApp(t *testing.T) { func TestHandleHeartbeat(t *testing.T) { commit := uint64(2) tests := []struct { - m pb.Message - wCommit uint64 + m pb.Message + lastTerm uint64 + wCommit uint64 }{ - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1}, + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit}, + + // Do not increase the commit index if the log is not guaranteed to be a + // prefix of the leader's log. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit}, + // Do not increase the commit index beyond our log size. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1}, } for i, tt := range tests { storage := newTestMemoryStorage(withPeers(1, 2)) - storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) + storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}}) sm := newTestRaft(1, 5, 1, storage) sm.becomeFollower(2, 2) sm.raftLog.commitTo(commit) From 4e08f525c0e3afdc3312c575ccaacce12e1c10a1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 29 Jan 2024 21:16:02 +0000 Subject: [PATCH 2/2] raft: pass the leader term to append/commit methods This change improves safety of the append operations on raftLog. This helped fixing some tests which made incorrect assumptions about the log. Signed-off-by: Pavel Kalinnikov --- bootstrap.go | 2 +- log.go | 79 +++++++++++++++++++++++++++++++++------------- log_test.go | 70 ++++++++++++++++++++-------------------- raft.go | 27 ++++++---------- raft_paper_test.go | 2 +- raft_snap_test.go | 10 +++--- raft_test.go | 44 ++++++++++++++------------ 7 files changed, 131 insertions(+), 103 deletions(-) diff --git a/bootstrap.go b/bootstrap.go index 2a61aa23..664d7614 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -58,7 +58,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} } - rn.raft.raftLog.append(ents...) + rn.raft.raftLog.append(rn.raft.Term, ents...) // Now apply them, mainly so that the application can call Campaign // immediately after StartNode in tests. Note that these nodes will diff --git a/log.go b/log.go index db746979..9603e64a 100644 --- a/log.go +++ b/log.go @@ -141,43 +141,72 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). // -// TODO(pav-kv): pass in the term of the leader who sent this update. It is only -// safe to handle this append if this term is >= l.leaderTerm. It is only safe -// to override an uncommitted suffix of entries if term > l.leaderTerm. -// // TODO(pav-kv): introduce a struct that consolidates the append metadata. The -// (prevEntryIndex, prevEntryTerm, leaderTerm) tuple must always be carried -// together, and safety of this append must be checked at the lowest layer here, +// (leaderTerm, prevIndex, prevTerm) tuple must always be carried together, so +// that safety properties for this append are checked at the lowest layers // rather than up in raft.go. -func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { - if !l.matchTerm(index, logTerm) { +func (l *raftLog) maybeAppend(leaderTerm, prevIndex, prevTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { + // Can not accept append requests from an outdated leader. + if leaderTerm < l.leaderTerm { + return 0, false + } + // Can not accept append requests that are not consistent with our log. + // + // NB: it is unnecessary to check matchTerm() if leaderTerm == l.leaderTerm, + // because the leader always sends self-consistent appends. For ensuring raft + // safety, this check is only necessary if leaderTerm > l.leaderTerm. + // + // TODO(pav-kv): however, we should log an error if leaderTerm == l.leaderTerm + // and the entry does not match. This means either the leader is sending + // inconsistent appends, or there is some state corruption in general. + if !l.matchTerm(prevIndex, prevTerm) { return 0, false } - lastnewi = index + uint64(len(ents)) + lastnewi = prevIndex + uint64(len(ents)) ci := l.findConflict(ents) switch { case ci == 0: case ci <= l.committed: l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: - offset := index + 1 + offset := prevIndex + 1 if ci-offset > uint64(len(ents)) { l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) } - l.append(ents[ci-offset:]...) + l.append(leaderTerm, ents[ci-offset:]...) } - l.commitTo(min(committed, lastnewi)) + // TODO(pav-kv): call commitTo from outside of this method, for a smaller API. + // TODO(pav-kv): it is safe to pass committed index as is here instead of min, + // but it breaks some tests that make incorrect assumptions. Fix this. + l.commitTo(leaderTerm, min(committed, lastnewi)) return lastnewi, true } -func (l *raftLog) append(ents ...pb.Entry) uint64 { - if len(ents) == 0 { +func (l *raftLog) append(leaderTerm uint64, ents ...pb.Entry) uint64 { + // Can not accept append requests from an outdated leader. + if leaderTerm < l.leaderTerm { + return l.lastIndex() + } + if len(ents) == 0 { // no-op return l.lastIndex() } if after := ents[0].Index - 1; after < l.committed { l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } + + // INVARIANT: l.term(i) <= l.leaderTerm, for any entry in the log. + // + // TODO(pav-kv): we should more generally check that the content of ents slice + // is correct: all entries have consecutive indices, and terms do not regress. + // We should do this validation once, on every incoming message, and pass the + // append in a type-safe "validated append" wrapper. This wrapper can provide + // convenient accessors to the prev/last entry, instead of raw slices access. + if lastTerm := ents[len(ents)-1].Term; lastTerm > leaderTerm { + l.logger.Panicf("leader at term %d tries to append a higher term %d", leaderTerm, lastTerm) + } + l.leaderTerm = leaderTerm // l.leaderTerm never regresses here + l.unstable.truncateAndAppend(ents) return l.lastIndex() } @@ -358,12 +387,16 @@ func (l *raftLog) lastIndex() uint64 { return i } -func (l *raftLog) commitTo(tocommit uint64) { - // never decrease commit - if l.committed < tocommit { - if l.lastIndex() < tocommit { - l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) - } +func (l *raftLog) commitTo(leaderTerm, tocommit uint64) { + // Do not accept the commit index update from a leader if our log is not + // consistent with the leader's log. + if leaderTerm != l.leaderTerm { + return + } + // Otherwise, we have the guarantee that our log is a prefix of the leader's + // log. All entries <= min(tocommit, lastIndex) can thus be committed. + tocommit = min(tocommit, l.lastIndex()) + if tocommit > l.committed { l.committed = tocommit } } @@ -487,12 +520,14 @@ func (l *raftLog) matchTerm(i, term uint64) bool { return t == term } -func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { +// TODO(pav-kv): clarify that (maxIndex, term) is the ID of the entry at the +// committed index. Clean this up. +func (l *raftLog) maybeCommit(leaderTerm, maxIndex, term uint64) bool { // NB: term should never be 0 on a commit because the leader campaigns at // least at term 1. But if it is 0 for some reason, we don't want to consider // this a term match in case zeroTermOnOutOfBounds returns 0. if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { - l.commitTo(maxIndex) + l.commitTo(leaderTerm, maxIndex) return true } return false diff --git a/log_test.go b/log_test.go index 2711ff9c..1d9ceab1 100644 --- a/log_test.go +++ b/log_test.go @@ -49,7 +49,7 @@ func TestFindConflict(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(10, previousEnts...) require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents)) }) } @@ -101,7 +101,7 @@ func TestFindConflictByTerm(t *testing.T) { Term: tt.ents[0].Term, }}) l := newLog(st, raftLogger) - l.append(tt.ents[1:]...) + l.append(10, tt.ents[1:]...) index, term := l.findConflictByTerm(tt.index, tt.term) require.Equal(t, tt.want, index) @@ -115,7 +115,7 @@ func TestFindConflictByTerm(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(3, previousEnts...) tests := []struct { lastIndex uint64 term uint64 @@ -183,7 +183,7 @@ func TestAppend(t *testing.T) { storage := NewMemoryStorage() storage.Append(previousEnts) raftLog := newLog(storage, raftLogger) - require.Equal(t, tt.windex, raftLog.append(tt.ents...)) + require.Equal(t, tt.windex, raftLog.append(10, tt.ents...)) g, err := raftLog.entries(1, noLimit) require.NoError(t, err) require.Equal(t, tt.wents, g) @@ -287,7 +287,7 @@ func TestLogMaybeAppend(t *testing.T) { for i, tt := range tests { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(10, previousEnts...) raftLog.committed = commit t.Run(fmt.Sprint(i), func(t *testing.T) { @@ -296,7 +296,7 @@ func TestLogMaybeAppend(t *testing.T) { require.True(t, tt.wpanic) } }() - glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents...) + glasti, gappend := raftLog.maybeAppend(10, tt.index, tt.logTerm, tt.committed, tt.ents...) require.Equal(t, tt.wlasti, glasti) require.Equal(t, tt.wappend, gappend) require.Equal(t, tt.wcommit, raftLog.committed) @@ -323,10 +323,10 @@ func TestCompactionSideEffects(t *testing.T) { } raftLog := newLog(storage, raftLogger) for i = unstableIndex; i < lastIndex; i++ { - raftLog.append(pb.Entry{Term: i + 1, Index: i + 1}) + raftLog.append(lastIndex, pb.Entry{Term: i + 1, Index: i + 1}) } - require.True(t, raftLog.maybeCommit(lastIndex, lastTerm)) + require.True(t, raftLog.maybeCommit(lastIndex, lastIndex, lastTerm)) raftLog.appliedTo(raftLog.committed, 0 /* size */) offset := uint64(500) @@ -346,7 +346,7 @@ func TestCompactionSideEffects(t *testing.T) { require.Equal(t, uint64(751), unstableEnts[0].Index) prev := raftLog.lastIndex() - raftLog.append(pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1}) + raftLog.append(lastIndex, pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex()}) require.Equal(t, prev+1, raftLog.lastIndex()) ents, err := raftLog.entries(raftLog.lastIndex(), noLimit) @@ -396,9 +396,9 @@ func TestHasNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) raftLog.applyingEntsPaused = tt.paused @@ -454,9 +454,9 @@ func TestNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) raftLog.applyingEntsPaused = tt.paused @@ -513,9 +513,9 @@ func TestAcceptApplying(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(3, 0 /* size */) raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable) @@ -562,9 +562,9 @@ func TestAppliedTo(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(3, 0 /* size */) raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */) @@ -597,7 +597,7 @@ func TestNextUnstableEnts(t *testing.T) { // append unstable entries to raftlog raftLog := newLog(storage, raftLogger) - raftLog.append(previousEnts[tt.unstable-1:]...) + raftLog.append(10, previousEnts[tt.unstable-1:]...) ents := raftLog.nextUnstableEnts() if l := len(ents); l > 0 { @@ -612,26 +612,24 @@ func TestNextUnstableEnts(t *testing.T) { func TestCommitTo(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}} commit := uint64(2) + const leadTerm = 10 tests := []struct { + term uint64 commit uint64 wcommit uint64 - wpanic bool }{ - {3, 3, false}, - {1, 2, false}, // never decrease - {4, 0, true}, // commit out of range -> panic + {term: leadTerm, commit: 3, wcommit: 3}, + {term: leadTerm, commit: 1, wcommit: 2}, // never decrease + {term: leadTerm, commit: 4, wcommit: 3}, // commit out of range -> cut at the last entry + {term: leadTerm - 1, commit: 3, wcommit: 2}, // outdated leader can't commit + {term: leadTerm + 1, commit: 3, wcommit: 2}, // newer leader can't commit if log is out of sync } for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { - defer func() { - if r := recover(); r != nil { - require.True(t, tt.wpanic) - } - }() raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(leadTerm, previousEnts...) raftLog.committed = commit - raftLog.commitTo(tt.commit) + raftLog.commitTo(tt.term, tt.commit) require.Equal(t, tt.wcommit, raftLog.committed) }) } @@ -651,7 +649,7 @@ func TestStableTo(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) + raftLog.append(10, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -688,7 +686,7 @@ func TestStableToWithSnap(t *testing.T) { s := NewMemoryStorage() require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})) raftLog := newLog(s, raftLogger) - raftLog.append(tt.newEnts...) + raftLog.append(raftLog.leaderTerm, tt.newEnts...) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -723,7 +721,7 @@ func TestCompaction(t *testing.T) { storage.Append([]pb.Entry{{Index: i}}) } raftLog := newLog(storage, raftLogger) - raftLog.maybeCommit(tt.lastIndex, 0) + raftLog.maybeCommit(raftLog.leaderTerm, tt.lastIndex, 0) raftLog.appliedTo(raftLog.committed, 0 /* size */) for j := 0; j < len(tt.compact); j++ { @@ -761,7 +759,7 @@ func TestIsOutOfBounds(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) l := newLog(storage, raftLogger) for i := uint64(1); i <= num; i++ { - l.append(pb.Entry{Index: i + offset}) + l.append(l.leaderTerm, pb.Entry{Index: i + offset}) } first := offset + 1 @@ -835,7 +833,7 @@ func TestTerm(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) for i := uint64(1); i < num; i++ { - l.append(pb.Entry{Index: offset + i, Term: i}) + l.append(num, pb.Entry{Index: offset + i, Term: i}) } for i, tt := range []struct { @@ -909,7 +907,7 @@ func TestSlice(t *testing.T) { Metadata: pb.SnapshotMetadata{Index: offset}})) require.NoError(t, storage.Append(entries(offset+1, half))) l := newLog(storage, raftLogger) - l.append(entries(half, last)...) + l.append(last, entries(half, last)...) for _, tt := range []struct { lo uint64 @@ -999,7 +997,7 @@ func TestScan(t *testing.T) { Metadata: pb.SnapshotMetadata{Index: offset}})) require.NoError(t, storage.Append(entries(offset+1, half))) l := newLog(storage, raftLogger) - l.append(entries(half, last)...) + l.append(last, entries(half, last)...) // Test that scan() returns the same entries as slice(), on all inputs. for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} { diff --git a/raft.go b/raft.go index f24b6a98..6fe2f681 100644 --- a/raft.go +++ b/raft.go @@ -754,7 +754,7 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) { // r.bcastAppend). func (r *raft) maybeCommit() bool { mci := r.trk.Committed() - return r.raftLog.maybeCommit(mci, r.Term) + return r.raftLog.maybeCommit(r.Term, mci, r.Term) } func (r *raft) reset(term uint64) { @@ -804,7 +804,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { return false } // use latest "last" index after truncate/append - li = r.raftLog.append(es...) + li = r.raftLog.append(r.Term, es...) // The leader needs to self-ack the entries just appended once they have // been durably persisted (since it doesn't send an MsgApp to itself). This // response message will be added to msgsAfterAppend and delivered back to @@ -1733,11 +1733,13 @@ func stepFollower(r *raft, m pb.Message) error { func (r *raft) handleAppendEntries(m pb.Message) { if m.Index < r.raftLog.committed { + // TODO(pav-kv): we may still append some entries from this message if they + // are consistent with the log. In addition, this message carries a commit + // index update that may bump our commit index. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { - r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader + if mlastIndex, ok := r.raftLog.maybeAppend(m.Term, m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1773,16 +1775,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - // It is only safe to advance the commit index if our log is a prefix of the - // leader's log. Otherwise, entries at this index may mismatch. - // - // TODO(pav-kv): move this logic to r.raftLog, which is more appropriate for - // handling safety. The raftLog can use leaderTerm for other safety checks. - // For example, unstable.truncateAndAppend currently may override a suffix of - // the log unconditionally, but it can only be done if m.Term > leaderTerm. - if m.Term == r.raftLog.leaderTerm { - r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) - } + r.raftLog.commitTo(m.Term, m.Commit) r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } @@ -1794,7 +1787,7 @@ func (r *raft) handleSnapshot(m pb.Message) { s = *m.Snapshot } sindex, sterm := s.Metadata.Index, s.Metadata.Term - if r.restore(s) { + if r.restore(m.Term, s) { r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, sindex, sterm) r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader @@ -1809,7 +1802,7 @@ func (r *raft) handleSnapshot(m pb.Message) { // restore recovers the state machine from a snapshot. It restores the log and the // configuration of state machine. If this method returns false, the snapshot was // ignored, either because it was obsolete or because of an error. -func (r *raft) restore(s pb.Snapshot) bool { +func (r *raft) restore(leaderTerm uint64, s pb.Snapshot) bool { if s.Metadata.Index <= r.raftLog.committed { return false } @@ -1862,7 +1855,7 @@ func (r *raft) restore(s pb.Snapshot) bool { if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) - r.raftLog.commitTo(s.Metadata.Index) + r.raftLog.commitTo(leaderTerm, s.Metadata.Index) return false } diff --git a/raft_paper_test.go b/raft_paper_test.go index fcdd49f2..28cab568 100644 --- a/raft_paper_test.go +++ b/raft_paper_test.go @@ -683,7 +683,7 @@ func TestFollowerAppendEntries(t *testing.T) { r := newTestRaft(1, 10, 1, storage) r.becomeFollower(2, 2) - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 10, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) { t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents) diff --git a/raft_snap_test.go b/raft_snap_test.go index e6058c68..11dd1964 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -33,7 +33,7 @@ var ( func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -51,7 +51,7 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { func TestPendingSnapshotPauseReplication(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -68,7 +68,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { func TestSnapshotFailure(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -91,7 +91,7 @@ func TestSnapshotFailure(t *testing.T) { func TestSnapshotSucceed(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -114,7 +114,7 @@ func TestSnapshotSucceed(t *testing.T) { func TestSnapshotAbort(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() diff --git a/raft_test.go b/raft_test.go index 16e40608..43a74864 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1209,6 +1209,7 @@ func TestCommit(t *testing.T) { storage.hardState = pb.HardState{Term: tt.smTerm} sm := newTestRaft(1, 10, 2, storage) + sm.raftLog.leaderTerm = tt.smTerm for j := 0; j < len(tt.matches); j++ { id := uint64(j) + 1 if id > 1 { @@ -1299,8 +1300,9 @@ func TestHandleMsgApp(t *testing.T) { {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // Ensure 3 - {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit up to last new entry 1 - {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2 + {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 0, true}, // can't commit under outdated term + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit up to entry 1 + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit up to last new entry 2 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit up to log.last() } @@ -1352,7 +1354,7 @@ func TestHandleHeartbeat(t *testing.T) { storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}}) sm := newTestRaft(1, 5, 1, storage) sm.becomeFollower(2, 2) - sm.raftLog.commitTo(commit) + sm.raftLog.commitTo(sm.raftLog.leaderTerm, commit) sm.handleHeartbeat(tt.m) if sm.raftLog.committed != tt.wCommit { t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) @@ -1374,7 +1376,7 @@ func TestHandleHeartbeatResp(t *testing.T) { sm := newTestRaft(1, 5, 1, storage) sm.becomeCandidate() sm.becomeLeader() - sm.raftLog.commitTo(sm.raftLog.lastIndex()) + sm.raftLog.commitTo(sm.raftLog.leaderTerm, sm.raftLog.lastIndex()) // A heartbeat response from a node that is behind; re-send MsgApp sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) @@ -1419,7 +1421,7 @@ func TestRaftFreesReadOnlyMem(t *testing.T) { sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) sm.becomeCandidate() sm.becomeLeader() - sm.raftLog.commitTo(sm.raftLog.lastIndex()) + sm.raftLog.commitTo(sm.Term, sm.raftLog.lastIndex()) ctx := []byte("ctx") @@ -2783,7 +2785,7 @@ func TestLeaderIncreaseNext(t *testing.T) { for i, tt := range tests { sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) - sm.raftLog.append(previousEnts...) + sm.raftLog.append(1, previousEnts...) sm.becomeCandidate() sm.becomeLeader() sm.trk.Progress[2].State = tt.state @@ -2932,7 +2934,7 @@ func TestRestore(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Fatal("restore fail, want succeed") } @@ -2947,7 +2949,7 @@ func TestRestore(t *testing.T) { t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters) } - if ok := sm.restore(s); ok { + if ok := sm.restore(sm.Term, s); ok { t.Fatal("restore succeed, want fail") } // It should not campaign before actually applying data. @@ -2971,7 +2973,7 @@ func TestRestoreWithLearner(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3)) sm := newTestLearnerRaft(3, 8, 2, storage) - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Error("restore fail, want succeed") } @@ -3000,7 +3002,7 @@ func TestRestoreWithLearner(t *testing.T) { } } - if ok := sm.restore(s); ok { + if ok := sm.restore(sm.Term, s); ok { t.Error("restore succeed, want fail") } } @@ -3017,7 +3019,7 @@ func TestRestoreWithVotersOutgoing(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Fatal("restore fail, want succeed") } @@ -3032,7 +3034,7 @@ func TestRestoreWithVotersOutgoing(t *testing.T) { t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters) } - if ok := sm.restore(s); ok { + if ok := sm.restore(sm.Term, s); ok { t.Fatal("restore succeed, want fail") } // It should not campaign before actually applying data. @@ -3067,7 +3069,7 @@ func TestRestoreVoterToLearner(t *testing.T) { if sm.isLearner { t.Errorf("%x is learner, want not", sm.id) } - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Error("restore failed unexpectedly") } } @@ -3090,7 +3092,7 @@ func TestRestoreLearnerPromotion(t *testing.T) { t.Errorf("%x is not learner, want yes", sm.id) } - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Error("restore fail, want succeed") } @@ -3114,7 +3116,7 @@ func TestLearnerReceiveSnapshot(t *testing.T) { n1 := newTestLearnerRaft(1, 10, 1, store) n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) - n1.restore(s) + n1.restore(n1.Term, s) snap := n1.raftLog.nextUnstableSnapshot() store.ApplySnapshot(*snap) n1.appliedSnap(snap) @@ -3138,8 +3140,8 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { commit := uint64(1) storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.raftLog.append(previousEnts...) - sm.raftLog.commitTo(commit) + sm.raftLog.append(1, previousEnts...) + sm.raftLog.commitTo(1, commit) s := pb.Snapshot{ Metadata: pb.SnapshotMetadata{ @@ -3150,7 +3152,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { } // ignore snapshot - if ok := sm.restore(s); ok { + if ok := sm.restore(1, s); ok { t.Errorf("restore = %t, want %t", ok, false) } if sm.raftLog.committed != commit { @@ -3159,7 +3161,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { // ignore snapshot and fast forward commit s.Metadata.Index = commit + 1 - if ok := sm.restore(s); ok { + if ok := sm.restore(1, s); ok { t.Errorf("restore = %t, want %t", ok, false) } if sm.raftLog.committed != commit+1 { @@ -3178,7 +3180,7 @@ func TestProvideSnap(t *testing.T) { } storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(s) + sm.restore(sm.Term, s) sm.becomeCandidate() sm.becomeLeader() @@ -3208,7 +3210,7 @@ func TestIgnoreProvidingSnap(t *testing.T) { } storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(s) + sm.restore(sm.Term, s) sm.becomeCandidate() sm.becomeLeader()