-
Notifications
You must be signed in to change notification settings - Fork 168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
raft: advance commit index safely #139
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,35 @@ type raftLog struct { | |
// they will be saved into storage. | ||
unstable unstable | ||
|
||
// leaderTerm is a term of the leader with whom our log is "consistent". The | ||
// log is guaranteed to be a prefix of this term's leader log. | ||
// | ||
// The leaderTerm can be safely updated to `t` if: | ||
// 1. the last entry in the log has term `t`, or, more generally, | ||
// 2. the last successful append was sent by the leader `t`. | ||
// | ||
// This is due to the following safety property (see raft paper §5.3): | ||
// | ||
// Log Matching: if two logs contain an entry with the same index and term, | ||
// then the logs are identical in all entries up through the given index. | ||
// | ||
// We use (1) to initialize leaderTerm, and (2) to maintain it on updates. | ||
// | ||
// NB: (2) does not imply (1). If our log is behind the leader's log, the last | ||
// entry term can be below leaderTerm. | ||
// | ||
// NB: leaderTerm does not necessarily match this raft node's term. It only | ||
// does for the leader. For followers and candidates, when we first learn or | ||
// bump to a new term, we don't have a proof that our log is consistent with | ||
// the new term's leader (current or prospective). The new leader may override | ||
// any suffix of the log after the committed index. Only when the first append | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Are you referring to the replicated log here, or this replica's local log? In other words, should these "the"s be replaced by "our"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both, but mostly the "replicated". The leader can override only a suffix of the "replicated" log after the "replicated" commit index. "Our" log is lagging the leader's log, such as our committed index. For "our" log, this implies that the leader can override a suffix after our committed index (but we have no way of checking by how much it can go back). |
||
// from the new leader succeeds, we can update leaderTerm. | ||
// | ||
// During normal operation, leaderTerm matches the node term though. During a | ||
// leader change, it briefly lags behind, and matches again when the first | ||
// append message succeeds. | ||
leaderTerm uint64 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My immediate feeling is that it's a little weird to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment https://github.com/etcd-io/raft/pull/139/files#r1469818174, specifically the Paxos analogy. Semantically,
There is an invariant at |
||
|
||
// committed is the highest log position that is known to be in | ||
// stable storage on a quorum of nodes. | ||
committed uint64 | ||
|
@@ -88,6 +117,11 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc | |
if err != nil { | ||
panic(err) // TODO(bdarnell) | ||
} | ||
lastTerm, err := storage.Term(lastIndex) | ||
if err != nil { | ||
panic(err) // TODO(pav-kv) | ||
} | ||
log.leaderTerm = lastTerm | ||
log.unstable.offset = lastIndex + 1 | ||
log.unstable.offsetInProgress = lastIndex + 1 | ||
log.unstable.logger = logger | ||
|
@@ -106,6 +140,15 @@ func (l *raftLog) String() string { | |
|
||
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, | ||
// it returns (last index of new entries, true). | ||
// | ||
// TODO(pav-kv): pass in the term of the leader who sent this update. It is only | ||
// safe to handle this append if this term is >= l.leaderTerm. It is only safe | ||
// to override an uncommitted suffix of entries if term > l.leaderTerm. | ||
// | ||
// TODO(pav-kv): introduce a struct that consolidates the append metadata. The | ||
// (prevEntryIndex, prevEntryTerm, leaderTerm) tuple must always be carried | ||
pav-kv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// together, and safety of this append must be checked at the lowest layer here, | ||
// rather than up in raft.go. | ||
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { | ||
if !l.matchTerm(index, logTerm) { | ||
return 0, false | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -935,6 +935,8 @@ func (r *raft) becomeLeader() { | |
// so the preceding log append does not count against the uncommitted log | ||
// quota of the new leader. In other words, after the call to appendEntry, | ||
// r.uncommittedSize is still 0. | ||
|
||
r.raftLog.leaderTerm = r.Term // the leader's log is consistent with itself | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: is is common to manipulate fields in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, the direct manipulation on In this particular case/PR though, we can move the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done this (see second commit). Unfortunately, too many things have incorrect interfaces: the Maybe it would be good to reverse the order of commits here: first clean up the |
||
r.logger.Infof("%x became leader at term %d", r.id, r.Term) | ||
} | ||
|
||
|
@@ -1735,6 +1737,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { | |
return | ||
} | ||
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { | ||
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nvanbenschoten This should move to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also done in the second commit. |
||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) | ||
return | ||
} | ||
|
@@ -1770,7 +1773,16 @@ func (r *raft) handleAppendEntries(m pb.Message) { | |
} | ||
|
||
func (r *raft) handleHeartbeat(m pb.Message) { | ||
r.raftLog.commitTo(m.Commit) | ||
// It is only safe to advance the commit index if our log is a prefix of the | ||
// leader's log. Otherwise, entries at this index may mismatch. | ||
// | ||
// TODO(pav-kv): move this logic to r.raftLog, which is more appropriate for | ||
// handling safety. The raftLog can use leaderTerm for other safety checks. | ||
// For example, unstable.truncateAndAppend currently may override a suffix of | ||
// the log unconditionally, but it can only be done if m.Term > leaderTerm. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm +1 on these additional safety checks / assertions, but they pose potential availability risk if we get them wrong and start rejecting valid state transitions. Should we make a habit of adding logging in the cases where we drop messages, so that any bugs are observable? For example, logging on an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Did you ever see a real issue or can you create a test to reproduce the issue (mismatch)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ahrtr Currently, this will not occur because the leader cuts the commit index at the follower's I can confirm, however, that in a test environment in CRDB I've seen out-of-bound commit indices in this line, triggering panic in This PR strengthens this code so that it handles out-of-bound indices correctly, and proceeds only if it's safe. Re @nvanbenschoten comment about logging: I don't know if we should log dropped messages. I think we should only be reporting safety properties violation - that means there is a bug in I don't expect we would be dropping any messages here currently (except in the test environment case that I described above). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a second thought, we might opt to distinguish legit message drops vs safety violations. Safety violations should crash offending nodes, or have other means of appearing on radars (e.g. a raft group can be "bricked", and this can surface in monitoring). See #18. |
||
if m.Term == r.raftLog.leaderTerm { | ||
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) | ||
} | ||
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) | ||
} | ||
|
||
|
@@ -1785,6 +1797,7 @@ func (r *raft) handleSnapshot(m pb.Message) { | |
if r.restore(s) { | ||
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", | ||
r.id, r.raftLog.committed, sindex, sterm) | ||
r.raftLog.leaderTerm = m.Term // the log is now consistent with the leader | ||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) | ||
} else { | ||
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1332,16 +1332,24 @@ func TestHandleMsgApp(t *testing.T) { | |
func TestHandleHeartbeat(t *testing.T) { | ||
commit := uint64(2) | ||
tests := []struct { | ||
m pb.Message | ||
wCommit uint64 | ||
m pb.Message | ||
lastTerm uint64 | ||
wCommit uint64 | ||
}{ | ||
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, | ||
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit | ||
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1}, | ||
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit | ||
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit}, | ||
|
||
// Do not increase the commit index if the log is not guaranteed to be a | ||
// prefix of the leader's log. | ||
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit}, | ||
// Do not increase the commit index beyond our log size. | ||
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1}, | ||
} | ||
|
||
for i, tt := range tests { | ||
storage := newTestMemoryStorage(withPeers(1, 2)) | ||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was incorrect previously. The third entry term is 3, while the leader This basically demonstrates that we fixed the unsafety. Now it's impossible for the follower to advance the commit index to 3 in this case. |
||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}}) | ||
sm := newTestRaft(1, 5, 1, storage) | ||
sm.becomeFollower(2, 2) | ||
sm.raftLog.commitTo(commit) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there cases where (2) is insufficient and (1) is needed? I'm curious why we can't make this more specific. You say "We use (1) to initialize leaderTerm" below. Is this important on startup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only (2) is necessary. (1) is a cheap "stateful" version of (2).
On a server restart, we forget who did the last append. (1) gives the best guess, and allows recovering to up-to-date state for idle raft groups that haven't seen recent appends except the current leader's empty entry. For non-idle groups, or followers whose log is significantly behind the leader's and doesn't end with entries at its term, recovering to
raftLog.lastTerm()
gives no value, and is equivalent to settingleaderTerm = 0
. To recover, these will then have to "wait" for the next append message from this leader.It would be more ideal if this field was stored in some
HardState.LeaderTerm
- then we would always recover to up-to-date state. Note that we can't reuseHardState.Term
becauseHardState.Term
can be >leaderTerm
(for the same reason whyr.Term
can be >leaderTerm
).To bring analogy with Paxos, the local
raftLog
is an acceptor, andraftLog.leaderTerm
is the id of the highest accepted proposal. The election term is sort of orthogonal to this - the election term (r.Term / HardState.Term
) can briefly jump in front of the accepted proposal term until there is an accepted proposal at this new term.If we ever want to bring this to the next level:caveat: #139 (comment)MsgApp
messages should not be rejected based onr.Term / HardState.Term
. For correctness, it is only necessary to rejectMsgApp
if the message term is <raftLog.leaderTerm / HardState.LeaderTerm
. I think this would reduce some unnecessary rejects during the leader election flux time.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should note the invariant here, and maybe check it in a few places or tests:
(1) initializes
leaderTerm
tolastTerm()
, which is safe becauseraft.Term >= raftLog.lastTerm()
(2) maintains it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I was hoping to clarify. Initializing to
lastTerm
is an opportunistic way to allow a restarted follower in idle raft groups to immediately advance its commit index on startup without needing to first accept a MsgApp.Is it anything more than that? For an idle raft group, a follower with an up-to-date log but a lagging commit index may restart and never receive any new entries. If we didn't have (1) and we started discarding commit indexes in heartbeats with terms > raftLog.leaderTerm, would the commit index on the follower get stuck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I think we indeed need (1). Also, (1) plays nicely with the invariant that I put above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean MsgApp from old leader will be accepted by voters that voted to a new leader but yet received any log from the new leader? In that case, there may be committed entry in old leader but not in the new leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joshuazh-x That's a good point. We don't want a quorum of nodes to accept entries unless we are sure these entries are consistent with the new leader's log.
So it's safest to accept only
MsgApp.Term >= raft.Term
. But we could sometimes acceptMsgApp.Term < raft.Term
if:MsgApp
contains the(index, term)
entry for which we voted when electing theraft.Term
leader. If that election wins, we know the new leader will append right after this entry.MsgApp.Entries
at the aforementionedindex
we voted for, and append it.A vote is a promise to the leader not to accept any entries that are not in the leader's log. If we can deduce that an entry is in the leader's log (before / other than by getting a
MsgApp
directly from this leader), we can always safely accept it.It's unclear if such an optimization would give any value (like reduce replication latency in some cases; probably it does avoid a duplicate
MsgApp
from the new leader when the election races with the old leader appends), so I will leave it as an exercise for later :) Looks like a complication.Filed #150 with a more general technique that will bring more benefits.