From a58e2a90f4df0839f5fe44b049422e201c3e4884 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Fri, 20 Sep 2024 10:21:59 -0400 Subject: [PATCH] raft: add tracing to important raft points This commit adds tracing to the various places within raft where state transitions happen. When a message is first proposed, it matches the context of the original message to the raft entry and registers it for tracing. Then it adds to the trace at the key points during the transitions and finally stops tracing once a `MsgStorageApplyResp` is received past the entry. Fixes: #104035 Release note: None --- pkg/kv/kvserver/BUILD.bazel | 2 + pkg/kv/kvserver/client_raft_log_queue_test.go | 62 +++++++++++++++++++ pkg/kv/kvserver/replica.go | 5 ++ pkg/kv/kvserver/replica_destroy.go | 1 + pkg/kv/kvserver/replica_init.go | 2 + pkg/kv/kvserver/replica_proposal.go | 4 +- pkg/kv/kvserver/replica_proposal_buf.go | 39 ++++++++---- pkg/kv/kvserver/replica_proposal_buf_test.go | 2 + pkg/kv/kvserver/replica_raft.go | 10 +++ pkg/kv/kvserver/store.go | 5 ++ 10 files changed, 118 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 18c1ca1edbe..bcf96f61a8d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -162,6 +162,7 @@ go_library( "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", @@ -438,6 +439,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 9a466877c68..a988d1a0297 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -33,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" "github.com/gogo/protobuf/proto" @@ -132,6 +135,65 @@ func TestRaftLogQueue(t *testing.T) { } } +func TestRaftTracing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(baptist): Remove this once we change the default to be enabled. + st := cluster.MakeTestingClusterSettings() + rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10) + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + RaftConfig: base.RaftConfig{ + RangeLeaseDuration: 24 * time.Hour, // disable lease moves + RaftElectionTimeoutTicks: 1 << 30, // disable elections + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + store := tc.GetFirstStoreFromServer(t, 0) + + // Write a single value to ensure we have a leader on n1. + key := tc.ScratchRange(t) + _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value"))) + require.NoError(t, pErr.GoError()) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + // Set to have 3 voters. + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...) + + for i := 0; i < 100; i++ { + var finish func() tracingpb.Recording + ctx := context.Background() + if i == 50 { + // Trace a random request on a "client" tracer. + ctx, finish = tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "test") + } + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i)))) + require.NoError(t, pErr.GoError()) + // Note that this is the clients span, there may be additional logs created after the span is returned. + if finish != nil { + output := finish().String() + // NB: It is hard to get all the messages in an expected order. We + // simply ensure some of the key messages are returned. Also note + // that we want to make sure that the logs are not reported against + // the tracing library, but the line that called into it. + expectedMessages := []string{ + `replica_proposal_buf.* flushing proposal to Raft`, + `replica_proposal_buf.* registering local trace`, + `replica_raft.* 1->2 MsgApp`, + `replica_raft.* 1->3 MsgApp`, + `replica_raft.* AppendThread->1 MsgStorageAppendResp`, + `ack-ing replication success to the client`, + } + require.NoError(t, testutils.MatchInOrder(output, expectedMessages...)) + } + } +} + // TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the // middle of applying a raft log truncation command that removes some entries // from the sideloaded storage. The test expects that storage remains in a diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1eb4a3369df..0b2fbc22408 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -891,6 +892,10 @@ type Replica struct { // MsgAppPull <=> LazyReplication. // Updated with both raftMu and mu held. currentRACv2Mode rac2.RaftMsgAppMode + + // raftTracer is used to trace raft messages that are sent with a + // tracing context. + raftTracer rafttrace.RaftTracer } // The raft log truncations that are pending. Access is protected by its own diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 553b5e012fd..3d730e24009 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { log.Fatalf(ctx, "removing raft group before destroying replica %s", r) } r.mu.internalRaftGroup = nil + r.mu.raftTracer.Close() } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 0a802d3982c..193f98f9e57 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/raft" @@ -328,6 +329,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { return err } r.mu.internalRaftGroup = rg + r.mu.raftTracer = *rafttrace.NewRaftTracer(ctx, r.Tracer, r.ClusterSettings(), &r.store.concurrentRaftTraces) r.flowControlV2.InitRaftLocked( ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)), rg.LogMark()) return nil diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 5b5b8bc1eb0..c0261d33b8e 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -118,10 +118,10 @@ type ProposalData struct { // for best coverage. `p.ctx` should be used when a `replicatedCmd` is not in // scope, i.e. outside of raft command application. // - // The context may by updated during the proposal lifecycle but will never + // The context may be updated during the proposal lifecycle but will never // be nil. To clear out the context, set it to context.Background(). It is // protected by an atomic pointer because it can be read without holding the - // raftMu use ProposalData.Context() to read it. + // raftMu. Use ProposalData.Context() to read it. // // TODO(baptist): Track down all the places where we read and write ctx and // determine whether we can convert this back to non-atomic field. diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index a55cbc122ce..5d96eceb009 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -126,6 +126,7 @@ type singleBatchProposer interface { getReplicaID() roachpb.ReplicaID flowControlHandle(ctx context.Context) kvflowcontrol.Handle onErrProposalDropped([]raftpb.Entry, []*ProposalData, raftpb.StateType) + registerForTracing(*ProposalData, raftpb.Entry) bool } // A proposer is an object that uses a propBuf to coordinate Raft proposals. @@ -874,19 +875,29 @@ func proposeBatch( p.onErrProposalDropped(ents, props, raftGroup.BasicStatus().RaftState) return nil //nolint:returnerrcheck } - if err == nil { - // Now that we know what raft log position[1] this proposal is to end up - // in, deduct flow tokens for it. This is done without blocking (we've - // already waited for available flow tokens pre-evaluation). The tokens - // will later be returned once we're informed of the entry being - // admitted below raft. - // - // [1]: We're relying on an undocumented side effect of upstream raft - // API where it populates the index and term for the passed in - // slice of entries. See etcd-io/raft#57. - maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + if err != nil { + return err } - return err + // Now that we know what raft log position[1] this proposal is to end up + // in, deduct flow tokens for it. This is done without blocking (we've + // already waited for available flow tokens pre-evaluation). The tokens + // will later be returned once we're informed of the entry being + // admitted below raft. + // + // [1]: We're relying on an undocumented side effect of upstream raft + // API where it populates the index and term for the passed in + // slice of entries. See etcd-io/raft#57. + maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + + // Register the proposal with rafttrace. This will add the trace to the raft + // lifecycle. We trace at most one entry per batch, so break after the first + // one is successfully registered. + for i := range ents { + if p.registerForTracing(props[i], ents[i]) { + break + } + } + return nil } func maybeDeductFlowTokens( @@ -1173,6 +1184,10 @@ func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp { return (*Replica)(rp).closedTimestampTargetRLocked() } +func (rp *replicaProposer) registerForTracing(p *ProposalData, e raftpb.Entry) bool { + return (*Replica)(rp).mu.raftTracer.MaybeRegister(p.Context(), e) +} + func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { return (*Replica)(rp).withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 51d72be45a2..126febaaa57 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -217,6 +217,8 @@ func (t *testProposer) campaignLocked(ctx context.Context) { } } +func (t *testProposer) registerForTracing(*ProposalData, raftpb.Entry) bool { return true } + func (t *testProposer) rejectProposalWithErrLocked(_ context.Context, _ *ProposalData, err error) { if t.onRejectProposalWithErrLocked == nil { panic(fmt.Sprintf("unexpected rejectProposalWithErrLocked call: err=%v", err)) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 5e6e2a9720d..165d2072cda 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -638,6 +638,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest var admittedVector rac2.AdmittedVector err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { + // If this message requested tracing, begin tracing it. + for _, e := range req.TracedEntries { + r.mu.raftTracer.RegisterRemote(e) + } + r.mu.raftTracer.MaybeTrace(req.Message) // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -1212,6 +1217,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + r.mu.raftTracer.MaybeTrace(msgStorageAppend) if state, err = s.StoreEntries(ctx, state, app, cb, &stats.append); err != nil { return stats, errors.Wrap(err, "while storing log entries") } @@ -1243,6 +1249,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tApplicationBegin = timeutil.Now() if hasMsg(msgStorageApply) { + r.mu.raftTracer.MaybeTrace(msgStorageApply) r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") err := appTask.ApplyCommittedEntries(ctx) @@ -1992,6 +1999,7 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( } for i, m := range localMsgs { + r.mu.raftTracer.MaybeTrace(m) if err := raftGroup.Step(m); err != nil { log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", raft.DescribeMessage(m, raftEntryFormatter), err) @@ -2015,6 +2023,7 @@ func (r *Replica) sendRaftMessage( lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() r.mu.RLock() + traced := r.mu.raftTracer.MaybeTrace(msg) fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey @@ -2067,6 +2076,7 @@ func (r *Replica) sendRaftMessage( RangeStartKey: startKey, // usually nil UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, LowPriorityOverride: lowPriorityOverride, + TracedEntries: traced, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3ec55dbaf5b..46867e63aee 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -942,6 +942,11 @@ type Store struct { // has likely improved). draining atomic.Bool + // concurrentRaftTraces is the number of concurrent raft trace requests that + // are currently registered. This limit is used to prevent extensive raft + // tracing from inadvertently impacting performance. + concurrentRaftTraces atomic.Int64 + // Locking notes: To avoid deadlocks, the following lock order must be // obeyed: baseQueue.mu < Replica.raftMu < Replica.readOnlyCmdMu < Store.mu // < Replica.mu < Replica.unreachablesMu < Store.coalescedMu < Store.scheduler.mu.