diff --git a/bootstrap.go b/bootstrap.go index 2a61aa23..664d7614 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -58,7 +58,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error { ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} } - rn.raft.raftLog.append(ents...) + rn.raft.raftLog.append(rn.raft.Term, ents...) // Now apply them, mainly so that the application can call Campaign // immediately after StartNode in tests. Note that these nodes will diff --git a/log.go b/log.go index 84826c3a..9603e64a 100644 --- a/log.go +++ b/log.go @@ -29,6 +29,35 @@ type raftLog struct { // they will be saved into storage. unstable unstable + // leaderTerm is a term of the leader with whom our log is "consistent". The + // log is guaranteed to be a prefix of this term's leader log. + // + // The leaderTerm can be safely updated to `t` if: + // 1. the last entry in the log has term `t`, or, more generally, + // 2. the last successful append was sent by the leader `t`. + // + // This is due to the following safety property (see raft paper ยง5.3): + // + // Log Matching: if two logs contain an entry with the same index and term, + // then the logs are identical in all entries up through the given index. + // + // We use (1) to initialize leaderTerm, and (2) to maintain it on updates. + // + // NB: (2) does not imply (1). If our log is behind the leader's log, the last + // entry term can be below leaderTerm. + // + // NB: leaderTerm does not necessarily match this raft node's term. It only + // does for the leader. For followers and candidates, when we first learn or + // bump to a new term, we don't have a proof that our log is consistent with + // the new term's leader (current or prospective). The new leader may override + // any suffix of the log after the committed index. Only when the first append + // from the new leader succeeds, we can update leaderTerm. + // + // During normal operation, leaderTerm matches the node term though. During a + // leader change, it briefly lags behind, and matches again when the first + // append message succeeds. + leaderTerm uint64 + // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. committed uint64 @@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc if err != nil { panic(err) // TODO(bdarnell) } + lastTerm, err := storage.Term(lastIndex) + if err != nil { + panic(err) // TODO(pav-kv) + } + log.leaderTerm = lastTerm log.unstable.offset = lastIndex + 1 log.unstable.offsetInProgress = lastIndex + 1 log.unstable.logger = logger @@ -106,35 +140,73 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). -func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { - if !l.matchTerm(index, logTerm) { +// +// TODO(pav-kv): introduce a struct that consolidates the append metadata. The +// (leaderTerm, prevIndex, prevTerm) tuple must always be carried together, so +// that safety properties for this append are checked at the lowest layers +// rather than up in raft.go. +func (l *raftLog) maybeAppend(leaderTerm, prevIndex, prevTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { + // Can not accept append requests from an outdated leader. + if leaderTerm < l.leaderTerm { + return 0, false + } + // Can not accept append requests that are not consistent with our log. + // + // NB: it is unnecessary to check matchTerm() if leaderTerm == l.leaderTerm, + // because the leader always sends self-consistent appends. For ensuring raft + // safety, this check is only necessary if leaderTerm > l.leaderTerm. + // + // TODO(pav-kv): however, we should log an error if leaderTerm == l.leaderTerm + // and the entry does not match. This means either the leader is sending + // inconsistent appends, or there is some state corruption in general. + if !l.matchTerm(prevIndex, prevTerm) { return 0, false } - lastnewi = index + uint64(len(ents)) + lastnewi = prevIndex + uint64(len(ents)) ci := l.findConflict(ents) switch { case ci == 0: case ci <= l.committed: l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: - offset := index + 1 + offset := prevIndex + 1 if ci-offset > uint64(len(ents)) { l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) } - l.append(ents[ci-offset:]...) + l.append(leaderTerm, ents[ci-offset:]...) } - l.commitTo(min(committed, lastnewi)) + // TODO(pav-kv): call commitTo from outside of this method, for a smaller API. + // TODO(pav-kv): it is safe to pass committed index as is here instead of min, + // but it breaks some tests that make incorrect assumptions. Fix this. + l.commitTo(leaderTerm, min(committed, lastnewi)) return lastnewi, true } -func (l *raftLog) append(ents ...pb.Entry) uint64 { - if len(ents) == 0 { +func (l *raftLog) append(leaderTerm uint64, ents ...pb.Entry) uint64 { + // Can not accept append requests from an outdated leader. + if leaderTerm < l.leaderTerm { + return l.lastIndex() + } + if len(ents) == 0 { // no-op return l.lastIndex() } if after := ents[0].Index - 1; after < l.committed { l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } + + // INVARIANT: l.term(i) <= l.leaderTerm, for any entry in the log. + // + // TODO(pav-kv): we should more generally check that the content of ents slice + // is correct: all entries have consecutive indices, and terms do not regress. + // We should do this validation once, on every incoming message, and pass the + // append in a type-safe "validated append" wrapper. This wrapper can provide + // convenient accessors to the prev/last entry, instead of raw slices access. + if lastTerm := ents[len(ents)-1].Term; lastTerm > leaderTerm { + l.logger.Panicf("leader at term %d tries to append a higher term %d", leaderTerm, lastTerm) + } + l.leaderTerm = leaderTerm // l.leaderTerm never regresses here + l.unstable.truncateAndAppend(ents) return l.lastIndex() } @@ -315,12 +387,16 @@ func (l *raftLog) lastIndex() uint64 { return i } -func (l *raftLog) commitTo(tocommit uint64) { - // never decrease commit - if l.committed < tocommit { - if l.lastIndex() < tocommit { - l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) - } +func (l *raftLog) commitTo(leaderTerm, tocommit uint64) { + // Do not accept the commit index update from a leader if our log is not + // consistent with the leader's log. + if leaderTerm != l.leaderTerm { + return + } + // Otherwise, we have the guarantee that our log is a prefix of the leader's + // log. All entries <= min(tocommit, lastIndex) can thus be committed. + tocommit = min(tocommit, l.lastIndex()) + if tocommit > l.committed { l.committed = tocommit } } @@ -444,12 +520,14 @@ func (l *raftLog) matchTerm(i, term uint64) bool { return t == term } -func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { +// TODO(pav-kv): clarify that (maxIndex, term) is the ID of the entry at the +// committed index. Clean this up. +func (l *raftLog) maybeCommit(leaderTerm, maxIndex, term uint64) bool { // NB: term should never be 0 on a commit because the leader campaigns at // least at term 1. But if it is 0 for some reason, we don't want to consider // this a term match in case zeroTermOnOutOfBounds returns 0. if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { - l.commitTo(maxIndex) + l.commitTo(leaderTerm, maxIndex) return true } return false diff --git a/log_test.go b/log_test.go index 2711ff9c..1d9ceab1 100644 --- a/log_test.go +++ b/log_test.go @@ -49,7 +49,7 @@ func TestFindConflict(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(10, previousEnts...) require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents)) }) } @@ -101,7 +101,7 @@ func TestFindConflictByTerm(t *testing.T) { Term: tt.ents[0].Term, }}) l := newLog(st, raftLogger) - l.append(tt.ents[1:]...) + l.append(10, tt.ents[1:]...) index, term := l.findConflictByTerm(tt.index, tt.term) require.Equal(t, tt.want, index) @@ -115,7 +115,7 @@ func TestFindConflictByTerm(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(3, previousEnts...) tests := []struct { lastIndex uint64 term uint64 @@ -183,7 +183,7 @@ func TestAppend(t *testing.T) { storage := NewMemoryStorage() storage.Append(previousEnts) raftLog := newLog(storage, raftLogger) - require.Equal(t, tt.windex, raftLog.append(tt.ents...)) + require.Equal(t, tt.windex, raftLog.append(10, tt.ents...)) g, err := raftLog.entries(1, noLimit) require.NoError(t, err) require.Equal(t, tt.wents, g) @@ -287,7 +287,7 @@ func TestLogMaybeAppend(t *testing.T) { for i, tt := range tests { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(10, previousEnts...) raftLog.committed = commit t.Run(fmt.Sprint(i), func(t *testing.T) { @@ -296,7 +296,7 @@ func TestLogMaybeAppend(t *testing.T) { require.True(t, tt.wpanic) } }() - glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents...) + glasti, gappend := raftLog.maybeAppend(10, tt.index, tt.logTerm, tt.committed, tt.ents...) require.Equal(t, tt.wlasti, glasti) require.Equal(t, tt.wappend, gappend) require.Equal(t, tt.wcommit, raftLog.committed) @@ -323,10 +323,10 @@ func TestCompactionSideEffects(t *testing.T) { } raftLog := newLog(storage, raftLogger) for i = unstableIndex; i < lastIndex; i++ { - raftLog.append(pb.Entry{Term: i + 1, Index: i + 1}) + raftLog.append(lastIndex, pb.Entry{Term: i + 1, Index: i + 1}) } - require.True(t, raftLog.maybeCommit(lastIndex, lastTerm)) + require.True(t, raftLog.maybeCommit(lastIndex, lastIndex, lastTerm)) raftLog.appliedTo(raftLog.committed, 0 /* size */) offset := uint64(500) @@ -346,7 +346,7 @@ func TestCompactionSideEffects(t *testing.T) { require.Equal(t, uint64(751), unstableEnts[0].Index) prev := raftLog.lastIndex() - raftLog.append(pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1}) + raftLog.append(lastIndex, pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex()}) require.Equal(t, prev+1, raftLog.lastIndex()) ents, err := raftLog.entries(raftLog.lastIndex(), noLimit) @@ -396,9 +396,9 @@ func TestHasNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) raftLog.applyingEntsPaused = tt.paused @@ -454,9 +454,9 @@ func TestNextCommittedEnts(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLog(storage, raftLogger) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(tt.applied, 0 /* size */) raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) raftLog.applyingEntsPaused = tt.paused @@ -513,9 +513,9 @@ func TestAcceptApplying(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(3, 0 /* size */) raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable) @@ -562,9 +562,9 @@ func TestAppliedTo(t *testing.T) { require.NoError(t, storage.Append(ents[:1])) raftLog := newLogWithSize(storage, raftLogger, maxSize) - raftLog.append(ents...) + raftLog.append(raftLog.leaderTerm, ents...) raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.maybeCommit(raftLog.leaderTerm, 5, 1) raftLog.appliedTo(3, 0 /* size */) raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */) @@ -597,7 +597,7 @@ func TestNextUnstableEnts(t *testing.T) { // append unstable entries to raftlog raftLog := newLog(storage, raftLogger) - raftLog.append(previousEnts[tt.unstable-1:]...) + raftLog.append(10, previousEnts[tt.unstable-1:]...) ents := raftLog.nextUnstableEnts() if l := len(ents); l > 0 { @@ -612,26 +612,24 @@ func TestNextUnstableEnts(t *testing.T) { func TestCommitTo(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}} commit := uint64(2) + const leadTerm = 10 tests := []struct { + term uint64 commit uint64 wcommit uint64 - wpanic bool }{ - {3, 3, false}, - {1, 2, false}, // never decrease - {4, 0, true}, // commit out of range -> panic + {term: leadTerm, commit: 3, wcommit: 3}, + {term: leadTerm, commit: 1, wcommit: 2}, // never decrease + {term: leadTerm, commit: 4, wcommit: 3}, // commit out of range -> cut at the last entry + {term: leadTerm - 1, commit: 3, wcommit: 2}, // outdated leader can't commit + {term: leadTerm + 1, commit: 3, wcommit: 2}, // newer leader can't commit if log is out of sync } for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { - defer func() { - if r := recover(); r != nil { - require.True(t, tt.wpanic) - } - }() raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) + raftLog.append(leadTerm, previousEnts...) raftLog.committed = commit - raftLog.commitTo(tt.commit) + raftLog.commitTo(tt.term, tt.commit) require.Equal(t, tt.wcommit, raftLog.committed) }) } @@ -651,7 +649,7 @@ func TestStableTo(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) + raftLog.append(10, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -688,7 +686,7 @@ func TestStableToWithSnap(t *testing.T) { s := NewMemoryStorage() require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})) raftLog := newLog(s, raftLogger) - raftLog.append(tt.newEnts...) + raftLog.append(raftLog.leaderTerm, tt.newEnts...) raftLog.stableTo(tt.stablei, tt.stablet) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -723,7 +721,7 @@ func TestCompaction(t *testing.T) { storage.Append([]pb.Entry{{Index: i}}) } raftLog := newLog(storage, raftLogger) - raftLog.maybeCommit(tt.lastIndex, 0) + raftLog.maybeCommit(raftLog.leaderTerm, tt.lastIndex, 0) raftLog.appliedTo(raftLog.committed, 0 /* size */) for j := 0; j < len(tt.compact); j++ { @@ -761,7 +759,7 @@ func TestIsOutOfBounds(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) l := newLog(storage, raftLogger) for i := uint64(1); i <= num; i++ { - l.append(pb.Entry{Index: i + offset}) + l.append(l.leaderTerm, pb.Entry{Index: i + offset}) } first := offset + 1 @@ -835,7 +833,7 @@ func TestTerm(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) for i := uint64(1); i < num; i++ { - l.append(pb.Entry{Index: offset + i, Term: i}) + l.append(num, pb.Entry{Index: offset + i, Term: i}) } for i, tt := range []struct { @@ -909,7 +907,7 @@ func TestSlice(t *testing.T) { Metadata: pb.SnapshotMetadata{Index: offset}})) require.NoError(t, storage.Append(entries(offset+1, half))) l := newLog(storage, raftLogger) - l.append(entries(half, last)...) + l.append(last, entries(half, last)...) for _, tt := range []struct { lo uint64 @@ -999,7 +997,7 @@ func TestScan(t *testing.T) { Metadata: pb.SnapshotMetadata{Index: offset}})) require.NoError(t, storage.Append(entries(offset+1, half))) l := newLog(storage, raftLogger) - l.append(entries(half, last)...) + l.append(last, entries(half, last)...) // Test that scan() returns the same entries as slice(), on all inputs. for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} { diff --git a/raft.go b/raft.go index 5a150562..6fe2f681 100644 --- a/raft.go +++ b/raft.go @@ -754,7 +754,7 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) { // r.bcastAppend). func (r *raft) maybeCommit() bool { mci := r.trk.Committed() - return r.raftLog.maybeCommit(mci, r.Term) + return r.raftLog.maybeCommit(r.Term, mci, r.Term) } func (r *raft) reset(term uint64) { @@ -804,7 +804,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { return false } // use latest "last" index after truncate/append - li = r.raftLog.append(es...) + li = r.raftLog.append(r.Term, es...) // The leader needs to self-ack the entries just appended once they have // been durably persisted (since it doesn't send an MsgApp to itself). This // response message will be added to msgsAfterAppend and delivered back to @@ -935,6 +935,8 @@ func (r *raft) becomeLeader() { // so the preceding log append does not count against the uncommitted log // quota of the new leader. In other words, after the call to appendEntry, // r.uncommittedSize is still 0. + + r.raftLog.leaderTerm = r.Term // the leader's log is consistent with itself r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -1731,10 +1733,13 @@ func stepFollower(r *raft, m pb.Message) error { func (r *raft) handleAppendEntries(m pb.Message) { if m.Index < r.raftLog.committed { + // TODO(pav-kv): we may still append some entries from this message if they + // are consistent with the log. In addition, this message carries a commit + // index update that may bump our commit index. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + if mlastIndex, ok := r.raftLog.maybeAppend(m.Term, m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1770,7 +1775,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) + r.raftLog.commitTo(m.Term, m.Commit) r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } @@ -1782,9 +1787,10 @@ func (r *raft) handleSnapshot(m pb.Message) { s = *m.Snapshot } sindex, sterm := s.Metadata.Index, s.Metadata.Term - if r.restore(s) { + if r.restore(m.Term, s) { r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, sindex, sterm) + r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", @@ -1796,7 +1802,7 @@ func (r *raft) handleSnapshot(m pb.Message) { // restore recovers the state machine from a snapshot. It restores the log and the // configuration of state machine. If this method returns false, the snapshot was // ignored, either because it was obsolete or because of an error. -func (r *raft) restore(s pb.Snapshot) bool { +func (r *raft) restore(leaderTerm uint64, s pb.Snapshot) bool { if s.Metadata.Index <= r.raftLog.committed { return false } @@ -1849,7 +1855,7 @@ func (r *raft) restore(s pb.Snapshot) bool { if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) - r.raftLog.commitTo(s.Metadata.Index) + r.raftLog.commitTo(leaderTerm, s.Metadata.Index) return false } diff --git a/raft_paper_test.go b/raft_paper_test.go index fcdd49f2..28cab568 100644 --- a/raft_paper_test.go +++ b/raft_paper_test.go @@ -683,7 +683,7 @@ func TestFollowerAppendEntries(t *testing.T) { r := newTestRaft(1, 10, 1, storage) r.becomeFollower(2, 2) - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 10, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) { t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents) diff --git a/raft_snap_test.go b/raft_snap_test.go index e6058c68..11dd1964 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -33,7 +33,7 @@ var ( func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -51,7 +51,7 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { func TestPendingSnapshotPauseReplication(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -68,7 +68,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { func TestSnapshotFailure(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -91,7 +91,7 @@ func TestSnapshotFailure(t *testing.T) { func TestSnapshotSucceed(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() @@ -114,7 +114,7 @@ func TestSnapshotSucceed(t *testing.T) { func TestSnapshotAbort(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(testingSnap) + sm.restore(sm.Term, testingSnap) sm.becomeCandidate() sm.becomeLeader() diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..43a74864 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1209,6 +1209,7 @@ func TestCommit(t *testing.T) { storage.hardState = pb.HardState{Term: tt.smTerm} sm := newTestRaft(1, 10, 2, storage) + sm.raftLog.leaderTerm = tt.smTerm for j := 0; j < len(tt.matches); j++ { id := uint64(j) + 1 if id > 1 { @@ -1299,8 +1300,9 @@ func TestHandleMsgApp(t *testing.T) { {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // Ensure 3 - {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit up to last new entry 1 - {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2 + {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 0, true}, // can't commit under outdated term + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit up to entry 1 + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit up to last new entry 2 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit up to log.last() } @@ -1332,19 +1334,27 @@ func TestHandleMsgApp(t *testing.T) { func TestHandleHeartbeat(t *testing.T) { commit := uint64(2) tests := []struct { - m pb.Message - wCommit uint64 + m pb.Message + lastTerm uint64 + wCommit uint64 }{ - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1}, + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit}, + + // Do not increase the commit index if the log is not guaranteed to be a + // prefix of the leader's log. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit}, + // Do not increase the commit index beyond our log size. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1}, } for i, tt := range tests { storage := newTestMemoryStorage(withPeers(1, 2)) - storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) + storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}}) sm := newTestRaft(1, 5, 1, storage) sm.becomeFollower(2, 2) - sm.raftLog.commitTo(commit) + sm.raftLog.commitTo(sm.raftLog.leaderTerm, commit) sm.handleHeartbeat(tt.m) if sm.raftLog.committed != tt.wCommit { t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) @@ -1366,7 +1376,7 @@ func TestHandleHeartbeatResp(t *testing.T) { sm := newTestRaft(1, 5, 1, storage) sm.becomeCandidate() sm.becomeLeader() - sm.raftLog.commitTo(sm.raftLog.lastIndex()) + sm.raftLog.commitTo(sm.raftLog.leaderTerm, sm.raftLog.lastIndex()) // A heartbeat response from a node that is behind; re-send MsgApp sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) @@ -1411,7 +1421,7 @@ func TestRaftFreesReadOnlyMem(t *testing.T) { sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) sm.becomeCandidate() sm.becomeLeader() - sm.raftLog.commitTo(sm.raftLog.lastIndex()) + sm.raftLog.commitTo(sm.Term, sm.raftLog.lastIndex()) ctx := []byte("ctx") @@ -2775,7 +2785,7 @@ func TestLeaderIncreaseNext(t *testing.T) { for i, tt := range tests { sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2))) - sm.raftLog.append(previousEnts...) + sm.raftLog.append(1, previousEnts...) sm.becomeCandidate() sm.becomeLeader() sm.trk.Progress[2].State = tt.state @@ -2924,7 +2934,7 @@ func TestRestore(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Fatal("restore fail, want succeed") } @@ -2939,7 +2949,7 @@ func TestRestore(t *testing.T) { t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters) } - if ok := sm.restore(s); ok { + if ok := sm.restore(sm.Term, s); ok { t.Fatal("restore succeed, want fail") } // It should not campaign before actually applying data. @@ -2963,7 +2973,7 @@ func TestRestoreWithLearner(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3)) sm := newTestLearnerRaft(3, 8, 2, storage) - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Error("restore fail, want succeed") } @@ -2992,7 +3002,7 @@ func TestRestoreWithLearner(t *testing.T) { } } - if ok := sm.restore(s); ok { + if ok := sm.restore(sm.Term, s); ok { t.Error("restore succeed, want fail") } } @@ -3009,7 +3019,7 @@ func TestRestoreWithVotersOutgoing(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Fatal("restore fail, want succeed") } @@ -3024,7 +3034,7 @@ func TestRestoreWithVotersOutgoing(t *testing.T) { t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters) } - if ok := sm.restore(s); ok { + if ok := sm.restore(sm.Term, s); ok { t.Fatal("restore succeed, want fail") } // It should not campaign before actually applying data. @@ -3059,7 +3069,7 @@ func TestRestoreVoterToLearner(t *testing.T) { if sm.isLearner { t.Errorf("%x is learner, want not", sm.id) } - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Error("restore failed unexpectedly") } } @@ -3082,7 +3092,7 @@ func TestRestoreLearnerPromotion(t *testing.T) { t.Errorf("%x is not learner, want yes", sm.id) } - if ok := sm.restore(s); !ok { + if ok := sm.restore(sm.Term, s); !ok { t.Error("restore fail, want succeed") } @@ -3106,7 +3116,7 @@ func TestLearnerReceiveSnapshot(t *testing.T) { n1 := newTestLearnerRaft(1, 10, 1, store) n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) - n1.restore(s) + n1.restore(n1.Term, s) snap := n1.raftLog.nextUnstableSnapshot() store.ApplySnapshot(*snap) n1.appliedSnap(snap) @@ -3130,8 +3140,8 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { commit := uint64(1) storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) - sm.raftLog.append(previousEnts...) - sm.raftLog.commitTo(commit) + sm.raftLog.append(1, previousEnts...) + sm.raftLog.commitTo(1, commit) s := pb.Snapshot{ Metadata: pb.SnapshotMetadata{ @@ -3142,7 +3152,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { } // ignore snapshot - if ok := sm.restore(s); ok { + if ok := sm.restore(1, s); ok { t.Errorf("restore = %t, want %t", ok, false) } if sm.raftLog.committed != commit { @@ -3151,7 +3161,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) { // ignore snapshot and fast forward commit s.Metadata.Index = commit + 1 - if ok := sm.restore(s); ok { + if ok := sm.restore(1, s); ok { t.Errorf("restore = %t, want %t", ok, false) } if sm.raftLog.committed != commit+1 { @@ -3170,7 +3180,7 @@ func TestProvideSnap(t *testing.T) { } storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(s) + sm.restore(sm.Term, s) sm.becomeCandidate() sm.becomeLeader() @@ -3200,7 +3210,7 @@ func TestIgnoreProvidingSnap(t *testing.T) { } storage := newTestMemoryStorage(withPeers(1)) sm := newTestRaft(1, 10, 1, storage) - sm.restore(s) + sm.restore(sm.Term, s) sm.becomeCandidate() sm.becomeLeader()