From bd5b4218f8d1dc2a190f535da34b0f2b2ea6954f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 30 Jan 2024 13:06:39 +0000 Subject: [PATCH] log: use logSlice in raftLog.maybeAppend Signed-off-by: Pavel Kalinnikov --- log.go | 18 ++++++++++-------- log_test.go | 12 +++++++++++- raft.go | 19 ++++++++++++++++--- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/log.go b/log.go index e8cbddd9..9097491c 100644 --- a/log.go +++ b/log.go @@ -106,23 +106,25 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). -func (l *raftLog) maybeAppend(prev entryID, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { - if !l.matchTerm(prev) { +func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) { + if !l.matchTerm(a.prev) { return 0, false } + // TODO(pav-kv): propagate logSlice down the stack. It will be used all the + // way down in unstable, for safety checks, and for useful bookkeeping. - lastnewi = prev.index + uint64(len(ents)) - ci := l.findConflict(ents) + lastnewi = a.prev.index + uint64(len(a.entries)) + ci := l.findConflict(a.entries) switch { case ci == 0: case ci <= l.committed: l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: - offset := prev.index + 1 - if ci-offset > uint64(len(ents)) { - l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) + offset := a.prev.index + 1 + if ci-offset > uint64(len(a.entries)) { + l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries)) } - l.append(ents[ci-offset:]...) + l.append(a.entries[ci-offset:]...) } l.commitTo(min(committed, lastnewi)) return lastnewi, true diff --git a/log_test.go b/log_test.go index 0f6a7d71..c53f4bf3 100644 --- a/log_test.go +++ b/log_test.go @@ -296,6 +296,16 @@ func TestLogMaybeAppend(t *testing.T) { } for i, tt := range tests { + // TODO(pav-kv): for now, we pick a high enough app.term so that it + // represents a valid append message. The maybeAppend currently ignores it, + // but it must check that the append does not regress the term. + app := logSlice{ + term: 100, + prev: tt.prev, + entries: tt.ents, + } + require.NoError(t, app.valid()) + raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append(previousEnts...) raftLog.committed = commit @@ -306,7 +316,7 @@ func TestLogMaybeAppend(t *testing.T) { require.True(t, tt.wpanic) } }() - glasti, gappend := raftLog.maybeAppend(tt.prev, tt.committed, tt.ents...) + glasti, gappend := raftLog.maybeAppend(app, tt.committed) require.Equal(t, tt.wlasti, glasti) require.Equal(t, tt.wappend, gappend) require.Equal(t, tt.wcommit, raftLog.committed) diff --git a/raft.go b/raft.go index e2db03b6..3357ae45 100644 --- a/raft.go +++ b/raft.go @@ -1744,13 +1744,26 @@ func stepFollower(r *raft, m pb.Message) error { return nil } +// logSliceFromMsgApp extracts the appended logSlice from a MsgApp message. +func logSliceFromMsgApp(m *pb.Message) logSlice { + // TODO(pav-kv): consider also validating the logSlice here. + return logSlice{ + term: m.Term, + prev: entryID{term: m.LogTerm, index: m.Index}, + entries: m.Entries, + } +} + func (r *raft) handleAppendEntries(m pb.Message) { - prev := entryID{term: m.LogTerm, index: m.Index} - if prev.index < r.raftLog.committed { + // TODO(pav-kv): construct logSlice up the stack next to receiving the + // message, and validate it before taking any action (e.g. bumping term). + a := logSliceFromMsgApp(&m) + + if a.prev.index < r.raftLog.committed { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(prev, m.Commit, m.Entries...); ok { + if mlastIndex, ok := r.raftLog.maybeAppend(a, m.Commit); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return }