Skip to content

Commit

Permalink
trace validation for etcd raft
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuazh-x committed Nov 15, 2023
1 parent 9100e77 commit f55da23
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 120 deletions.
2 changes: 2 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ func (n *node) run() {
rd = Ready{}
}
readyc = nil

traceNodeEvent(RsmReady, r)
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
Expand Down
19 changes: 15 additions & 4 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ type Config struct {
// This behavior will become unconditional in the future. See:
// https://github.com/etcd-io/raft/issues/83
StepDownOnRemoval bool

// raft state tracer
StateTracer RaftStateMachineTracer
}

func (c *Config) validate() error {
Expand Down Expand Up @@ -427,6 +430,9 @@ type raft struct {
// current term. Those will be handled as fast as first log is committed in
// current term.
pendingReadIndexMessages []pb.Message

stateTracer RaftStateMachineTracer
initStateTraced bool
}

func newRaft(c *Config) *raft {
Expand Down Expand Up @@ -456,6 +462,7 @@ func newRaft(c *Config) *raft {
disableProposalForwarding: c.DisableProposalForwarding,
disableConfChangeValidation: c.DisableConfChangeValidation,
stepDownOnRemoval: c.StepDownOnRemoval,
stateTracer: c.StateTracer,
}

cfg, prs, err := confchange.Restore(confchange.Changer{
Expand Down Expand Up @@ -756,7 +763,7 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) {
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
traceNodeEvent(RsmCommit, r)
defer traceNodeEvent(RsmCommit, r)

mci := r.prs.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
Expand Down Expand Up @@ -798,6 +805,10 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
if es[i].Type == pb.EntryNormal {
traceNodeEvent(RsmReplicate, r)
}

}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
Expand Down Expand Up @@ -938,12 +949,12 @@ func (r *raft) becomeLeader() {
r.pendingConfIndex = r.raftLog.lastIndex()

traceNodeEvent(RsmBecomeLeader, r)

emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}

// The payloadSize of an empty entry is 0 (see TestPayloadSizeOfEmptyEntry),
// so the preceding log append does not count against the uncommitted log
// quota of the new leader. In other words, after the call to appendEntry,
Expand Down Expand Up @@ -1062,6 +1073,7 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected
}

func (r *raft) Step(m pb.Message) error {
traceInitStateOnce(r)
traceReceiveMessage(r, &m)

// Handle the message term, which may result in our stepping down to a follower.
Expand Down Expand Up @@ -1309,8 +1321,6 @@ func stepLeader(r *raft, m pb.Message) error {
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
} else {
traceNodeEvent(RsmReplicate, r)
}
}

Expand Down Expand Up @@ -1481,6 +1491,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
traceReduceNextIndex(r, m.From)
r.sendAppend(m.From)
}
} else {
Expand Down
118 changes: 95 additions & 23 deletions state_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
RsmReplicate
RsmChangeConf
RsmApplyConfChange
RsmReady
RsmSendAppendEntriesRequest
RsmReceiveAppendEntriesRequest
RsmSendAppendEntriesResponse
Expand All @@ -26,6 +27,9 @@ const (
RsmReceiveRequestVoteRequest
RsmSendRequestVoteResponse
RsmReceiveRequestVoteResponse
RsmSendSnapshot
RsmReceiveSnapshot
RsmReduceNextIndex
)

func (e RaftStateMachineEventType) String() string {
Expand All @@ -38,6 +42,7 @@ func (e RaftStateMachineEventType) String() string {
"Replicate",
"ChangeConf",
"ApplyConfChange",
"Ready",
"SendAppendEntriesRequest",
"ReceiveAppendEntriesRequest",
"SendAppendEntriesResponse",
Expand All @@ -46,22 +51,34 @@ func (e RaftStateMachineEventType) String() string {
"ReceiveRequestVoteRequest",
"SendRequestVoteResponse",
"ReceiveRequestVoteResponse",
"SendSnapshot",
"ReceiveSnapshot",
"ReduceNextIndex",
}[e]
}

const (
ConfChangeAddNewServer string = "AddNewServer"
ConfChangeRemoveServer string = "RemoveServer"
ConfChangeAddLearner string = "AddLearner"
)

type TracingEvent struct {
Name string `json:"name"`
NodeID string `json:"nid"`
State raftpb.HardState `json:"state"`
State TracingState `json:"state"`
Role string `json:"role"`
LogSize uint64 `json:"log"`
Conf [2][]string `json:"conf"`
Message *TracingMessage `json:"msg,omitempty"`
ConfChange *TracingConfChange `json:"cc,omitempty"`
Properties map[string]any `json:"prop,omitempty"`
}

type TracingState struct {
Term uint64 `json:"term"`
Vote string `json:"vote"`
Commit uint64 `json:"commit"`
}

type TracingMessage struct {
Expand All @@ -73,7 +90,7 @@ type TracingMessage struct {
LogTerm uint64 `json:"logTerm"`
Index uint64 `json:"index"`
Commit uint64 `json:"commit"`
Vote uint64 `json:"vote"`
Vote string `json:"vote"`
Reject bool `json:"reject"`
RejectHint uint64 `json:"rejectHint"`
}
Expand All @@ -88,21 +105,38 @@ type TracingConfChange struct {
NewConf []string `json:"newconf,omitempty"`
}

func makeTracingState(r *raft) TracingState {
hs := r.hardState()
return TracingState{
Term: hs.Term,
Vote: strconv.FormatUint(hs.Vote, 10),
Commit: hs.Commit,
}
}

func makeTracingMessage(m *raftpb.Message) *TracingMessage {
if m == nil {
return nil
}

logTerm := m.LogTerm
entries := len(m.Entries)
index := m.Index
if m.Type == raftpb.MsgSnap {
index = 0
logTerm = 0
entries = int(m.Snapshot.Metadata.Index)
}
return &TracingMessage{
Type: m.Type.String(),
Term: m.Term,
From: strconv.FormatUint(m.From, 10),
To: strconv.FormatUint(m.To, 10),
EntryLength: len(m.Entries),
LogTerm: m.LogTerm,
Index: m.Index,
EntryLength: entries,
LogTerm: logTerm,
Index: index,
Commit: m.Commit,
Vote: m.Vote,
Vote: strconv.FormatUint(m.Vote, 10),
Reject: m.Reject,
RejectHint: m.RejectHint,
}
Expand All @@ -112,25 +146,38 @@ type RaftStateMachineTracer interface {
TraceState(*TracingEvent)
}

var stateTracer RaftStateMachineTracer
func traceInitStateOnce(r *raft) {
if r.stateTracer == nil {
return
}

if r.initStateTraced {
return
}

func SetStateTracer(t RaftStateMachineTracer) {
stateTracer = t
r.initStateTraced = true

traceNodeEvent(RsmInitState, r)
}

func traceEvent(evt RaftStateMachineEventType, r *raft, m *raftpb.Message, cc *TracingConfChange) {
if stateTracer == nil {
func traceEvent(evt RaftStateMachineEventType, r *raft, m *raftpb.Message, prop map[string]any) {
if r.stateTracer == nil {
return
}

stateTracer.TraceState(&TracingEvent{
if !r.initStateTraced {
return
}

r.stateTracer.TraceState(&TracingEvent{
Name: evt.String(),
NodeID: strconv.FormatUint(r.id, 10),
State: r.hardState(),
State: makeTracingState(r),
LogSize: r.raftLog.lastIndex(),
Conf: [2][]string{formatConf(r.prs.Voters[0].Slice()), formatConf(r.prs.Voters[1].Slice())},
Role: r.state.String(),
Message: makeTracingMessage(m),
ConfChange: cc,
Properties: prop,
})
}

Expand All @@ -156,14 +203,21 @@ func traceChangeConfEvent(cci raftpb.ConfChangeI, r *raft) {
NodeId: strconv.FormatUint(c.NodeID, 10),
Action: ConfChangeRemoveServer,
})
case raftpb.ConfChangeAddLearnerNode:
cc.Changes = append(cc.Changes, SingleConfChange{
NodeId: strconv.FormatUint(c.NodeID, 10),
Action: ConfChangeAddLearner,
})
}
}

traceEvent(RsmChangeConf, r, nil, cc)
p := map[string]any{}
p["cc"] = cc
traceEvent(RsmChangeConf, r, nil, p)
}

func traceConfChangeEvent(cfg tracker.Config, r *raft) {
if stateTracer == nil {
if r.stateTracer == nil {
return
}

Expand All @@ -172,19 +226,26 @@ func traceConfChangeEvent(cfg tracker.Config, r *raft) {
NewConf: formatConf(cfg.Voters[0].Slice()),
}

traceEvent(RsmApplyConfChange, r, nil, cc)
p := map[string]any{}
p["cc"] = cc
traceEvent(RsmApplyConfChange, r, nil, p)
}

func traceSendMessage(r *raft, m *raftpb.Message) {
if stateTracer == nil {
if r.stateTracer == nil {
return
}

p := map[string]any{}

var evt RaftStateMachineEventType
switch m.Type {
case raftpb.MsgApp:
evt = RsmSendAppendEntriesRequest
case raftpb.MsgAppResp:
p["advanceNextIndex"] = r.prs.Progress[m.To].State == tracker.StateReplicate
case raftpb.MsgHeartbeat, raftpb.MsgSnap:
evt = RsmSendAppendEntriesRequest
case raftpb.MsgAppResp, raftpb.MsgHeartbeatResp:
evt = RsmSendAppendEntriesResponse
case raftpb.MsgVote:
evt = RsmSendRequestVoteRequest
Expand All @@ -194,19 +255,19 @@ func traceSendMessage(r *raft, m *raftpb.Message) {
return
}

traceEvent(evt, r, m, nil)
traceEvent(evt, r, m, p)
}

func traceReceiveMessage(r *raft, m *raftpb.Message) {
if stateTracer == nil {
if r.stateTracer == nil {
return
}

var evt RaftStateMachineEventType
switch m.Type {
case raftpb.MsgApp:
case raftpb.MsgApp, raftpb.MsgHeartbeat, raftpb.MsgSnap:
evt = RsmReceiveAppendEntriesRequest
case raftpb.MsgAppResp:
case raftpb.MsgAppResp, raftpb.MsgHeartbeatResp:
evt = RsmReceiveAppendEntriesResponse
case raftpb.MsgVote:
evt = RsmReceiveRequestVoteRequest
Expand All @@ -219,6 +280,17 @@ func traceReceiveMessage(r *raft, m *raftpb.Message) {
traceEvent(evt, r, m, nil)
}

func traceReduceNextIndex(r *raft, peer uint64) {
if r.stateTracer == nil {
return
}

p := map[string]any{}
p["peer"] = strconv.FormatUint(peer, 10)
p["nextIndex"] = r.prs.Progress[peer].Next
traceEvent(RsmReduceNextIndex, r, nil, p)
}

func formatConf(s []uint64) []string {
if s == nil {
return []string{}
Expand Down
16 changes: 7 additions & 9 deletions tla/Traceetcdraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ VIEW
PROPERTIES
TraceMatched

PROPERTIES
etcdSpec
\* PROPERTIES
\* etcdSpec

\* Checking for deadlocks during trace validation is disabled, as it may lead to false
\* counterexamples. A trace specification defines a set of traces, where at least one
Expand All @@ -32,20 +32,18 @@ CONSTANTS
InitServer <- TraceInitServer
Server <- TraceServer

InitConfVars <- TraceInitConfVars
InitLogVars <- TraceInitLogVars
InitDurableStates <- TraceInitDurableStates

EntriesToAppend <- TraceEntriesToAppend
NextIndexForUnsuccessfulAppendResponse <- TraceNextIndexForUnsuccessfulAppendResponse

Nil = 0
Nil = "0"

ValueEntry = "ValueEntry"
ConfigEntry = "ConfigEntry"

Follower = "Follower"
Candidate = "Candidate"
Leader = "Leader"
Follower = "StateFollower"
Candidate = "StateCandidate"
Leader = "StateLeader"
RequestVoteRequest = "RequestVoteRequest"
RequestVoteResponse = "RequestVoteResponse"
AppendEntriesRequest = "AppendEntriesRequest"
Expand Down
Loading

0 comments on commit f55da23

Please sign in to comment.