diff --git a/log.go b/log.go index 84826c3a..3b93242c 100644 --- a/log.go +++ b/log.go @@ -29,6 +29,35 @@ type raftLog struct { // they will be saved into storage. unstable unstable + // leaderTerm is a term of the leader with whom our log is "consistent". The + // log is guaranteed to be a prefix of this term's leader log. + // + // The leaderTerm can be safely updated to `t` if: + // 1. the last entry in the log has term `t`, or, more generally, + // 2. the last successful append was sent by the leader `t`. + // + // This is due to the following safety property (see raft paper ยง5.3): + // + // Log Matching: if two logs contain an entry with the same index and term, + // then the logs are identical in all entries up through the given index. + // + // We use (1) to initialize leaderTerm, and (2) to maintain it on updates. + // + // NB: (2) does not imply (1). If our log is behind the leader's log, the last + // entry term can be below leaderTerm. + // + // NB: leaderTerm does not necessarily match this raft node's term. It only + // does for the leader. For followers and candidates, when we first learn or + // bump to a new term, we don't have a proof that our log is consistent with + // the new term's leader (current or prospective). The new leader may override + // any suffix of the log after the committed index. Only when the first append + // from the new leader succeeds, we can update leaderTerm. + // + // During normal operation, leaderTerm matches the node term though. During a + // leader change, it briefly lags behind, and matches again when the first + // append message succeeds. + leaderTerm uint64 + // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. committed uint64 @@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc if err != nil { panic(err) // TODO(bdarnell) } + lastTerm, err := storage.Term(lastIndex) + if err != nil { + panic(err) // TODO(pav-kv) + } + log.leaderTerm = lastTerm log.unstable.offset = lastIndex + 1 log.unstable.offsetInProgress = lastIndex + 1 log.unstable.logger = logger diff --git a/raft.go b/raft.go index 16146740..e7245ee9 100644 --- a/raft.go +++ b/raft.go @@ -769,7 +769,6 @@ func (r *raft) reset(term uint64) { r.Vote = None } r.lead = None - r.logSynced = false r.electionElapsed = 0 r.heartbeatElapsed = 0 @@ -873,10 +872,6 @@ func (r *raft) becomeFollower(term uint64, lead uint64) { r.reset(term) r.tick = r.tickElection r.lead = lead - // If the last entry term matches the leader term, the log is guaranteed to be - // a prefix of the leader's log. Otherwise, we will establish this guarantee - // later, on the first successful MsgApp. - r.logSynced = r.raftLog.lastTerm() == term r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term) } @@ -919,7 +914,6 @@ func (r *raft) becomeLeader() { r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id - r.logSynced = true // the leader's log is in sync with itself r.state = StateLeader // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -947,6 +941,8 @@ func (r *raft) becomeLeader() { // so the preceding log append does not count against the uncommitted log // quota of the new leader. In other words, after the call to appendEntry, // r.uncommittedSize is still 0. + + r.raftLog.leaderTerm = r.Term // the leader's log is consistent with itself r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -1747,7 +1743,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { - r.logSynced = true // from now on, the log is a prefix of the leader's log + r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1787,10 +1783,10 @@ func (r *raft) handleHeartbeat(m pb.Message) { // leader's log. Otherwise, entries at this index may mismatch. // // TODO(pav-kv): move this logic to r.raftLog, which is more appropriate for - // handling safety. The raftLog can use the logSynced flag for other safety - // checks. For example, unstable.truncateAndAppend currently may override a - // suffix of the log unconditionally, but it can only be done if !logSynced. - if r.logSynced { + // handling safety. The raftLog can use leaderTerm for other safety checks. + // For example, unstable.truncateAndAppend currently may override a suffix of + // the log unconditionally, but it can only be done if m.Term > leaderTerm. + if m.Term == r.raftLog.leaderTerm { r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) @@ -1807,6 +1803,7 @@ func (r *raft) handleSnapshot(m pb.Message) { if r.restore(s) { r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, sindex, sterm) + r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",