diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index bdf953d5406c..f3bc4fc7a43d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -269,6 +269,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/protectedts:protectedts_test", "//pkg/kv/kvserver/raftentry:raftentry_test", "//pkg/kv/kvserver/raftlog:raftlog_test", + "//pkg/kv/kvserver/rafttrace:rafttrace_test", "//pkg/kv/kvserver/raftutil:raftutil_test", "//pkg/kv/kvserver/rangefeed:rangefeed_test", "//pkg/kv/kvserver/rangelog:rangelog_test", @@ -1513,6 +1514,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/raftentry:raftentry_test", "//pkg/kv/kvserver/raftlog:raftlog", "//pkg/kv/kvserver/raftlog:raftlog_test", + "//pkg/kv/kvserver/rafttrace:rafttrace", + "//pkg/kv/kvserver/rafttrace:rafttrace_test", "//pkg/kv/kvserver/raftutil:raftutil", "//pkg/kv/kvserver/raftutil:raftutil_test", "//pkg/kv/kvserver/rangefeed:rangefeed", diff --git a/pkg/cmd/roachtest/tests/admission_control_latency.go b/pkg/cmd/roachtest/tests/admission_control_latency.go index 7793bc158aab..036ce4dcf7e5 100644 --- a/pkg/cmd/roachtest/tests/admission_control_latency.go +++ b/pkg/cmd/roachtest/tests/admission_control_latency.go @@ -750,6 +750,11 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster) `SET CLUSTER SETTING kv.lease.reject_on_leader_unknown.enabled = true`); err != nil { t.Fatal(err) } + // Enable raft tracing. Remove this once raft tracing is the default. + if _, err := db.ExecContext(ctx, + `SET CLUSTER SETTING kv.raft.max_concurrent_traces = '10'`); err != nil { + t.Fatal(err) + } // This isn't strictly necessary, but it would be nice if this test passed at 10s (or lower). if _, err := db.ExecContext(ctx, `SET CLUSTER SETTING server.time_after_store_suspect = '10s'`); err != nil { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 18c1ca1edbef..bcf96f61a8d0 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -162,6 +162,7 @@ go_library( "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", @@ -438,6 +439,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 9a466877c684..a988d1a02970 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -33,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" "github.com/gogo/protobuf/proto" @@ -132,6 +135,65 @@ func TestRaftLogQueue(t *testing.T) { } } +func TestRaftTracing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(baptist): Remove this once we change the default to be enabled. + st := cluster.MakeTestingClusterSettings() + rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10) + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + RaftConfig: base.RaftConfig{ + RangeLeaseDuration: 24 * time.Hour, // disable lease moves + RaftElectionTimeoutTicks: 1 << 30, // disable elections + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + store := tc.GetFirstStoreFromServer(t, 0) + + // Write a single value to ensure we have a leader on n1. + key := tc.ScratchRange(t) + _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value"))) + require.NoError(t, pErr.GoError()) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + // Set to have 3 voters. + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...) + + for i := 0; i < 100; i++ { + var finish func() tracingpb.Recording + ctx := context.Background() + if i == 50 { + // Trace a random request on a "client" tracer. + ctx, finish = tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "test") + } + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i)))) + require.NoError(t, pErr.GoError()) + // Note that this is the clients span, there may be additional logs created after the span is returned. + if finish != nil { + output := finish().String() + // NB: It is hard to get all the messages in an expected order. We + // simply ensure some of the key messages are returned. Also note + // that we want to make sure that the logs are not reported against + // the tracing library, but the line that called into it. + expectedMessages := []string{ + `replica_proposal_buf.* flushing proposal to Raft`, + `replica_proposal_buf.* registering local trace`, + `replica_raft.* 1->2 MsgApp`, + `replica_raft.* 1->3 MsgApp`, + `replica_raft.* AppendThread->1 MsgStorageAppendResp`, + `ack-ing replication success to the client`, + } + require.NoError(t, testutils.MatchInOrder(output, expectedMessages...)) + } + } +} + // TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the // middle of applying a raft log truncation command that removes some entries // from the sideloaded storage. The test expects that storage remains in a diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 2a8897e1012d..5dda2a59f4eb 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -48,6 +48,16 @@ message RaftHeartbeat { bool lagging_followers_on_quiesce_accurate = 10; } +// The traced entry from the leader along with the trace and span ID. +message TracedEntry { + uint64 index = 1 [(gogoproto.nullable) = false, + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/kv/kvpb.RaftIndex"]; + uint64 trace_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; + uint64 span_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "SpanID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.SpanID"]; +} + // RaftMessageRequest is the request used to send raft messages using our // protobuf-based RPC codec. If a RaftMessageRequest has a non-empty number of // heartbeats or heartbeat_resps, the contents of the message field is treated @@ -103,6 +113,12 @@ message RaftMessageRequest { // indices. Used only with RACv2. kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedState admitted_state = 14 [(gogoproto.nullable) = false]; + // TracedEntry is a mapping from Raft index to trace and span ids for this + // request. They are set by the leaseholder and begin tracing on all + // replicas. Currently, traces are not returned to the leaseholder, but + // instead logged to a local log file. + repeated TracedEntry traced_entries = 15 [(gogoproto.nullable) = false]; + reserved 10; } diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 553cf26e4098..4f1e311004ca 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -267,7 +267,7 @@ func traceProposals(r *Replica, ids []kvserverbase.CmdIDKey, event string) { r.mu.RLock() for _, id := range ids { if prop, ok := r.mu.proposals[id]; ok { - ctxs = append(ctxs, prop.ctx) + ctxs = append(ctxs, prop.Context()) } } r.mu.RUnlock() diff --git a/pkg/kv/kvserver/rafttrace/BUILD.bazel b/pkg/kv/kvserver/rafttrace/BUILD.bazel new file mode 100644 index 000000000000..d4038ab33454 --- /dev/null +++ b/pkg/kv/kvserver/rafttrace/BUILD.bazel @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "rafttrace", + srcs = ["rafttrace.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvpb", + "//pkg/kv/kvserver/kvserverpb", + "//pkg/raft", + "//pkg/raft/raftpb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/log", + "//pkg/util/syncutil", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_cockroachdb_logtags//:logtags", + "@com_github_cockroachdb_redact//:redact", + ], +) + +go_test( + name = "rafttrace_test", + srcs = ["rafttrace_test.go"], + embed = [":rafttrace"], + deps = [ + "//pkg/kv/kvpb", + "//pkg/kv/kvserver/kvserverpb", + "//pkg/raft/raftpb", + "//pkg/settings/cluster", + "//pkg/testutils", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/kvserver/rafttrace/rafttrace.go b/pkg/kv/kvserver/rafttrace/rafttrace.go new file mode 100644 index 000000000000..2a6d945f8efb --- /dev/null +++ b/pkg/kv/kvserver/rafttrace/rafttrace.go @@ -0,0 +1,477 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rafttrace + +import ( + "context" + "math" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/raft" + "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/logtags" + "github.com/cockroachdb/redact" +) + +// MaxConcurrentRaftTraces is the maximum number of entries that can be traced +// at any time on this store. Additional traces will be ignored until the number +// of traces drops below the limit. Having too many active traces can negatively +// impact performance as we iterate over all of them for some messages. +// +// TODO(baptist): Bump the default to a reasonable value like 10 that balances +// usefulness with performance impact once we have validated the performance +// impact. +var MaxConcurrentRaftTraces = settings.RegisterIntSetting( + settings.SystemOnly, + "kv.raft.max_concurrent_traces", + "the maximum number of tracked raft traces, 0 will disable tracing", + 0, + settings.IntInRange(0, 1000), +) + +// traceValue represents the trace information for a single registration. +type traceValue struct { + traced kvserverpb.TracedEntry + // ctx is a trace specific context used to log events on this trace. + ctx context.Context + + mu struct { + syncutil.Mutex + + // seenMsgAppResp tracks whether a MsgAppResp message has already been + // logged by each replica peer. This limits the size of the log at a + // small risk of missing some important messages in the case of dropped + // messages or reproposals. + seenMsgAppResp map[raftpb.PeerID]bool + + // seenMsgStorageAppendResp tracks whether a MsgStorageAppendResp + // message has already been logged. + seenMsgStorageAppendResp bool + + // propCtx is the underlying proposal context used for tracing to the + // SQL trace. + propCtx context.Context + + // propSpan is the span connected to the propCtx. It must be finished + // when the trace is removed. + propSpan *tracing.Span + } +} + +// logf logs the message to the trace context and the proposal context. The +// proposal context is populated on the leaseholder and is attached to the SQL +// trace. +func (t *traceValue) logf(depth int, format string, args ...interface{}) { + log.InfofDepth(t.ctx, depth+1, format, args...) + + t.mu.Lock() + propCtx := t.mu.propCtx + t.mu.Unlock() + if propCtx != nil { + log.VEventfDepth(propCtx, depth+1, 3, format, args...) + } +} + +// seenMsgAppResp returns true if it hasn't seen an MsgAppResp for this peer. +func (t *traceValue) seenMsgAppResp(p raftpb.PeerID) bool { + t.mu.Lock() + defer t.mu.Unlock() + if t.mu.seenMsgAppResp[p] { + return true + } + t.mu.seenMsgAppResp[p] = true + return false +} + +// seenMsgStorageAppendResp returns true if it hasn't seen a +// MsgStorageAppendResp for this trace. +func (t *traceValue) seenMsgStorageAppendResp() bool { + t.mu.Lock() + defer t.mu.Unlock() + if t.mu.seenMsgStorageAppendResp { + return true + } + t.mu.seenMsgStorageAppendResp = true + return false +} + +// String attempts to balance uniqueness with readability by only keeping the +// lower 16 bits of the trace and span. +func (tv *traceValue) String() string { + return redact.StringWithoutMarkers(tv) +} + +func (tv *traceValue) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("i%d/%x.%x", tv.traced.Index, uint16(tv.traced.TraceID), uint16(tv.traced.SpanID)) +} + +// RaftTracer is a utility to trace the lifetime of raft log entries. It may log +// some unrelated entries, since it does not consider entry or leader term. It +// traces at most one MsgAppResp and MsgStorageAppendResp per index which is the +// first one that is past our index entry. This limitation means it may not +// capture all the relevant messages particularly if the term changes. +// +// The library will log in two different ways once to the standard cockroach log +// and once to the SQL trace on the leaseholder. +// TODO(baptist): Look at logging traces on followers and sending back to the +// leader. It would need to be best effort, but might still be useful. +// Alternatively, double-down on distributed trace collection if/when it's +// supported. So that the trace does not need to be plumbed back to the +// leaseholder / txn coordinator. +type RaftTracer struct { + // m is a map of all the currently traced entries for this replica. The + // aggregate size of the map across all replicas is equal to or less than + // numRegisteredStore unless the setting changes in which case we flush all + // entries on the next register call. We add to numRegistered before we + // update m, and delete from m before we remove from numRegistered to keep + // this invariant. On a setting change we flush all existing traces on the + // next call to register. + // TODO(baptist): Look at alternatives to using a map such as a sparse array + // or circular buffer. Specifically, we might be able to save some memory + // allocations. Note that the propCtx in the traceValue is already pulled + // from a pool inside the tracer. + m syncutil.Map[kvpb.RaftIndex, traceValue] + + // numRegisteredStore is the number of currently registered traces for this + // store, not this replica. The number of registered will normally be less + // than the MaxConcurrentRaftTraces setting. If the setting is lowered, we + // flush all traces on all replicas. + numRegisteredStore *atomic.Int64 + + // numRegisteredReplica is the number of currently registered traces for + // this replica. The sum(numRegisteredReplica) <= numRegisteredStore. We set + // numRegisteredReplica to MaxInt32 when we close the tracer to prevent new + // registrations. + // + // TODO(baptist/pav-kv): Look at optimizing to avoid the need for this to be + // an atomic. It likely doesn't need to be atomic since the callers should + // be holding Replica.raftMu and/or Replica.mu. + numRegisteredReplica atomic.Int64 + + // ctx is the ambient context for the replica and is used for remote + // traces. It contains the replica/range information. On each trace we + // additionally append the unique trace/span IDs. + ctx context.Context + st *cluster.Settings + + tracer *tracing.Tracer +} + +// NewRaftTracer creates a new RaftTracer with the given ambient context for the +// replica. +func NewRaftTracer( + ctx context.Context, + tracer *tracing.Tracer, + st *cluster.Settings, + numRegisteredStore *atomic.Int64, +) *RaftTracer { + return &RaftTracer{ctx: ctx, tracer: tracer, st: st, numRegisteredStore: numRegisteredStore} +} + +// reserveSpace checks if should register a new trace. If there are too many +// registered traces it will not register and return false. The soft invariant +// is that numRegisteredStore <= numAllowed which can be temporarily violated if +// MaxConcurrentRaftTraces is lowered. This method will return true if we can +// add one to the number registered for both the store and replica, otherwise it +// will return false. This method is optimized for the `numAllowed == 0` case +// and avoids loading `numRegisteredStore` until after this check.` +func (r *RaftTracer) reserveSpace() bool { + numAllowed := MaxConcurrentRaftTraces.Get(&r.st.SV) + numRegisteredReplica := r.numRegisteredReplica.Load() + + // This can only occur if the numAllowed setting has changed since a + // previous call to reserveSpace. If this happens flush all our current + // traces and don't register this request. Note that when this happens we + // also wont't log this request. + if numRegisteredReplica > numAllowed { + log.Infof(r.ctx, "flushing all traces due to setting change") + r.m.Range(func(index kvpb.RaftIndex, t *traceValue) bool { + r.removeEntry(index) + return true + }) + return false + } + + if numAllowed == 0 { + return false + } + + // The maximum number of traces has been reached for the store. We don't + // register tracing and return false. + numRegisteredStore := r.numRegisteredStore.Load() + if numRegisteredStore >= numAllowed { + return false + } + + // Only increment the number of registered traces if the numRegistered + // hasn't changed. In the case of an ABA update, it does not break the + // invariant since some other trace was registered and deregistered, but + // there is still a slot available. We will not register this trace if + // someone else is concurrently registering a trace on this store, but this + // is acceptable as it is a rare case. + registerSucceeded := r.numRegisteredStore.CompareAndSwap(numRegisteredStore, numRegisteredStore+1) + if registerSucceeded { + // Add one unconditionally to the replica count. + r.numRegisteredReplica.Add(1) + } + // Note we can't assert numRegisteredStore <= numAllowed because if the + // setting is changed it can be temporarily violated on other replicas. + return registerSucceeded +} + +// tryStore attempts to store this value. If the index is already in the map it +// will not store this entry and return false. It will also decrement counters +// that were incremented by reserveSpace. +// This is a rare case where we already have the index in the map. We +// don't want to store this entry, but also need to decrement the +// counter to avoid double tracing. +func (r *RaftTracer) tryStore(tv *traceValue) (*traceValue, bool) { + if existingTv, loaded := r.m.LoadOrStore(tv.traced.Index, tv); loaded { + tv.logf(2, "duplicate registration ignored - existing trace: %s", existingTv) + existingTv.logf(2, "additional registration for same index: %s", tv) + r.destroy(tv) + return existingTv, false + } + return tv, true +} + +// newTraceValue creates a new traceValue for the given traced entry. Note that +// it doesn't pass `propCtx` as the first parameter since this isn't the +// relevant context that should be used for logging and it can be nil. +func (r *RaftTracer) newTraceValue( + te kvserverpb.TracedEntry, propCtx context.Context, propSpan *tracing.Span, +) *traceValue { + tv := &traceValue{traced: te} + tv.ctx = logtags.AddTag(r.ctx, "id", redact.Safe(tv.String())) + tv.mu.seenMsgAppResp = make(map[raftpb.PeerID]bool) + tv.mu.propCtx = propCtx + tv.mu.propSpan = propSpan + return tv +} + +// RegisterRemote registers a remote trace. This is called when we receive a +// raft message over the wire with a request to continue tracing it. +func (r *RaftTracer) RegisterRemote(te kvserverpb.TracedEntry) { + if !r.reserveSpace() { + return + } + // NB: We don't currently return remote traces, if we did, we would pass the + // remote ctx here and trace it. + if tv, created := r.tryStore(r.newTraceValue(te, nil, nil)); created { + tv.logf(1, "registering remote trace %s", tv) + } +} + +// MaybeRegister is called on an entry that has been proposed to raft. This will +// begin logging all subsequent updates to this entry. It returns true if the +// registration is successful. A duplicate registration of the same index is +// considered a success and returns true, however the older registration is kept +// and this registration is ignored. +func (r *RaftTracer) MaybeRegister(ctx context.Context, ent raftpb.Entry) bool { + // If the index is nil, then we can't trace this entry. This can happen if + // there is a leader/leaseholder spilt. We don't have an easy way to handle + // this today, so don't attempt to trace it. + if ent.Index == 0 { + log.VEvent(ctx, 2, "skip registering raft proposal without index") + return false + } + + // Only register the entry if this is a traced context with verbose logging. + span := tracing.SpanFromContext(ctx) + if span == nil || span.RecordingType() != tracingpb.RecordingVerbose { + return false + } + + // This must be the last conditional. If this returns true we must call + // storeEntryWithTracing to not leak a registered permit. + if !r.reserveSpace() { + log.VEvent(ctx, 2, "too many active raft traces, skipping") + return false + } + + ctx, span = r.tracer.StartSpanCtx(ctx, "raft trace", + tracing.WithParent(span), tracing.WithFollowsFrom()) + if tv, created := r.tryStore(r.newTraceValue(kvserverpb.TracedEntry{ + Index: kvpb.RaftIndex(ent.Index), + TraceID: span.TraceID(), + SpanID: span.SpanID(), + }, ctx, span)); created { + tv.logf(1, "registering local trace %s", tv) + } + return true +} + +// MaybeTrace logs the message in every trace it is relevant to. +func (r *RaftTracer) MaybeTrace(m raftpb.Message) []kvserverpb.TracedEntry { + // NB: This check is an optimization to handle the common case where there + // are no registered traces on this replica. + if r.numRegisteredReplica.Load() == 0 { + return nil + } + + switch m.Type { + case raftpb.MsgProp, raftpb.MsgApp, raftpb.MsgStorageAppend, raftpb.MsgStorageApply: + return r.traceIfCovered(m) + case raftpb.MsgAppResp, raftpb.MsgStorageAppendResp, raftpb.MsgStorageApplyResp: + r.traceIfPast(m) + return nil + } + return nil +} + +// removeEntry removes the trace at the given index and decrements the +// registered counters at the replica and store level. +func (r *RaftTracer) removeEntry(index kvpb.RaftIndex) { + tv, found := r.m.LoadAndDelete(index) + if !found { + return + } + // Don't allow additional tracing to this context. + r.destroy(tv) +} + +func (r *RaftTracer) destroy(tv *traceValue) { + r.numRegisteredReplica.Add(-1) + r.numRegisteredStore.Add(-1) + + tv.mu.Lock() + defer tv.mu.Unlock() + if tv.mu.propSpan != nil { + tv.mu.propSpan.Finish() + tv.mu.propCtx = nil + tv.mu.propSpan = nil + } +} + +// Close will unregister all the currently active traces and prevent additional +// traces from being added. It is safe to call multiple times, but should always +// be called at least once when the replica is destroyed to prevent leaking +// traces. +// Note that there could be a race between another caller calling Register and +// us closing the tracer, however we won't allow any new registrations to come +// through after this call. Note that we set this to MaxInt32 instead of +// MaxInt64 to avoid a rare race where another thread is in the middle of +// `reserveSpace` and calls `Add(1)` which cause overflow. +func (r *RaftTracer) Close() { + r.numRegisteredReplica.Store(math.MaxInt32) + + r.m.Range(func(index kvpb.RaftIndex, t *traceValue) bool { + t.logf(2, "cleanup log index %d during Close", index) + r.removeEntry(index) + return true + }) +} + +func peer(p raftpb.PeerID) redact.SafeString { + return redact.SafeString(raft.DescribeTarget(p)) +} + +// traceIfCovered will log the message if it touches any of the registered trace +// points. Additionally it returns any saved trace/span IDs for sending to +// remote nodes. This applies both to messages that the leader sends to +// followers, and messages replicas send to their local storage. +func (r *RaftTracer) traceIfCovered(m raftpb.Message) []kvserverpb.TracedEntry { + if len(m.Entries) == 0 { + return nil + } + minEntryIndex := kvpb.RaftIndex(m.Entries[0].Index) + maxEntryIndex := kvpb.RaftIndex(m.Entries[len(m.Entries)-1].Index) + var tracedEntries []kvserverpb.TracedEntry + r.m.Range(func(index kvpb.RaftIndex, t *traceValue) bool { + // If the traced index is not in the range of the entries, we can skip + // it. We don't need to check each individual entry since they are + // contiguous. + if t.traced.Index < minEntryIndex || t.traced.Index > maxEntryIndex { + return true + } + tracedEntries = append(tracedEntries, t.traced) + // TODO(baptist): Not all the fields are relevant to log for all + // message types. Consider cleaning up what is logged. + t.logf(4, + "%s->%s %v Term:%d Log:%d/%d Entries:[%d-%d]", + peer(m.From), + peer(m.To), + m.Type, + m.Term, + m.LogTerm, + m.Index, + minEntryIndex, + maxEntryIndex, + ) + return true + }) + return tracedEntries +} + +// traceIfPast will log the message to all registered traceValues the message is +// past. It will additionally unregister traces that are no longer useful. This +// call is for events that move the needle/watermark forward (e.g. the log +// storage syncs), but don't have an exact range of entries affected. So, being +// unable to match these events to entries exactly once, we instead check that +// the watermark passed the entry. To protect against overly verbose logging, we +// only allow MsgAppResp to be logged once per peer, and only one +// MsgStorageAppendResp. When we receive a MsgStorageApplyResp we will log and +// unregister the tracing. +func (r *RaftTracer) traceIfPast(m raftpb.Message) { + if m.Reject { + return + } + r.m.Range(func(index kvpb.RaftIndex, t *traceValue) bool { + switch m.Type { + case raftpb.MsgAppResp: + if kvpb.RaftIndex(m.Index) >= index && !t.seenMsgAppResp(m.From) { + t.logf(4, + "%s->%s %v Term:%d Index:%d", + peer(m.From), + peer(m.To), + m.Type, + m.Term, + m.Index, + ) + } + case raftpb.MsgStorageAppendResp: + if kvpb.RaftIndex(m.Index) >= index && !t.seenMsgStorageAppendResp() { + t.logf(4, + "%s->%s %v Log:%d/%d", + peer(m.From), + peer(m.To), + m.Type, + m.LogTerm, + m.Index, + ) + } + case raftpb.MsgStorageApplyResp: + if len(m.Entries) == 0 { + return true + } + // Use the last entry to determine if we should log this message. + msgIndex := m.Entries[len(m.Entries)-1].Index + if kvpb.RaftIndex(msgIndex) >= index { + t.logf(4, + "%s->%s %v LastEntry:%d/%d", + peer(m.From), + peer(m.To), + m.Type, + m.Entries[len(m.Entries)-1].Term, + m.Entries[len(m.Entries)-1].Index, + ) + // We unregister the index here because we are now "done" with + // this entry and don't expect more useful events. + t.logf(4, "unregistered log index %d from tracing", index) + r.removeEntry(index) + } + } + return true + }) +} diff --git a/pkg/kv/kvserver/rafttrace/rafttrace_test.go b/pkg/kv/kvserver/rafttrace/rafttrace_test.go new file mode 100644 index 000000000000..59fdd5a9e1d3 --- /dev/null +++ b/pkg/kv/kvserver/rafttrace/rafttrace_test.go @@ -0,0 +1,344 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package rafttrace + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/stretchr/testify/require" +) + +func createTracer(count int64) *RaftTracer { + ctx := context.Background() + tracer := tracing.NewTracer() + st := cluster.MakeTestingClusterSettings() + MaxConcurrentRaftTraces.Override(ctx, &st.SV, count) + numRegisteredStore := atomic.Int64{} + return NewRaftTracer(ctx, tracer, st, &numRegisteredStore) +} + +func TestRegisterRemote(t *testing.T) { + rt := createTracer(10) + + te := kvserverpb.TracedEntry{Index: 1, TraceID: 123, SpanID: 456} + rt.RegisterRemote(te) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) + require.Equal(t, int64(1), rt.numRegisteredReplica.Load()) +} + +func TestMaybeRegisterNoSpan(t *testing.T) { + rt := createTracer(10) + + // Test without a span in context + ctx := context.Background() + require.False(t, rt.MaybeRegister(ctx, raftpb.Entry{Index: 1})) + require.Equal(t, int64(0), rt.numRegisteredStore.Load()) + require.Equal(t, int64(0), rt.numRegisteredReplica.Load()) +} + +func TestMaybeRegisterWithSpan(t *testing.T) { + rt := createTracer(10) + + ctx := context.Background() + // Test with a span in the context. + ctx, span := rt.tracer.StartSpanCtx(ctx, "test-span", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer span.Finish() + + require.True(t, rt.MaybeRegister(ctx, raftpb.Entry{Index: 1})) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) + require.Equal(t, int64(1), rt.numRegisteredReplica.Load()) +} + +func TestMaybeTraceNoSpan(t *testing.T) { + rt := createTracer(10) + ctx := context.Background() + + ent := raftpb.Entry{Index: 1} + require.False(t, rt.MaybeRegister(ctx, ent)) + require.Empty(t, rt.MaybeTrace(raftpb.Message{Type: raftpb.MsgApp, Entries: []raftpb.Entry{ent}})) +} + +func TestMaybeTraceWithSpan(t *testing.T) { + rt := createTracer(10) + ctx, span := rt.tracer.StartSpanCtx(context.Background(), "test-span", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer span.Finish() + + ent := raftpb.Entry{Index: 1} + require.True(t, rt.MaybeRegister(ctx, ent)) + tracedEntries := rt.MaybeTrace(raftpb.Message{ + Type: raftpb.MsgApp, + Entries: []raftpb.Entry{ent}, + }) + require.Len(t, tracedEntries, 1) + require.Equal(t, kvpb.RaftIndex(1), tracedEntries[0].Index) +} + +func TestClose(t *testing.T) { + rt := createTracer(10) + ctx, span := rt.tracer.StartSpanCtx(context.Background(), "test-span", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer span.Finish() + + require.True(t, rt.MaybeRegister(ctx, raftpb.Entry{Index: 1})) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) + require.Equal(t, int64(1), rt.numRegisteredReplica.Load()) + + rt.Close() + require.Equal(t, int64(0), rt.numRegisteredStore.Load()) + require.Greater(t, rt.numRegisteredReplica.Load(), int64(1000)) +} + +func TestTwoTracersSharingNumRegisteredStore(t *testing.T) { + numRegisteredStore := atomic.Int64{} + ctx := context.Background() + tracer := tracing.NewTracer() + st := cluster.MakeTestingClusterSettings() + MaxConcurrentRaftTraces.Override(ctx, &st.SV, 3) + + rt1 := NewRaftTracer(ctx, tracer, st, &numRegisteredStore) + rt2 := NewRaftTracer(ctx, tracer, st, &numRegisteredStore) + + // Register a trace in the first tracer. + ctx1, span1 := rt1.tracer.StartSpanCtx(ctx, "test-span-1", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer span1.Finish() + require.True(t, rt1.MaybeRegister(ctx1, raftpb.Entry{Index: 1})) + require.Equal(t, int64(1), rt1.numRegisteredStore.Load()) + require.Equal(t, int64(1), rt1.numRegisteredReplica.Load()) + + // Register a trace in the second tracer. + ctx2, span2 := rt2.tracer.StartSpanCtx(ctx, "test-span-2", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer span2.Finish() + require.True(t, rt2.MaybeRegister(ctx2, raftpb.Entry{Index: 2})) + require.Equal(t, int64(2), rt2.numRegisteredStore.Load()) + require.Equal(t, int64(1), rt2.numRegisteredReplica.Load()) + + // Ensure both tracers share the same numRegisteredStore. + require.Equal(t, rt1.numRegisteredStore, rt2.numRegisteredStore) + + // Close the first tracer and check the counts. + rt1.Close() + require.Equal(t, int64(1), rt2.numRegisteredStore.Load()) + require.Greater(t, rt1.numRegisteredReplica.Load(), int64(1000)) + require.Equal(t, int64(1), rt2.numRegisteredReplica.Load()) + + // Close the second tracer and check the counts. + rt2.Close() + require.Equal(t, int64(0), rt2.numRegisteredStore.Load()) + require.Greater(t, rt2.numRegisteredReplica.Load(), int64(1000)) +} + +func TestLimit(t *testing.T) { + rt := createTracer(2) + ctx1, span1 := rt.tracer.StartSpanCtx(context.Background(), "test-span", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer span1.Finish() + // Only 2 traces are allowed but we attempt to register 3. + require.True(t, rt.MaybeRegister(ctx1, raftpb.Entry{Index: 1})) + require.True(t, rt.MaybeRegister(ctx1, raftpb.Entry{Index: 2})) + require.False(t, rt.MaybeRegister(ctx1, raftpb.Entry{Index: 3})) + rt.Close() + require.Equal(t, int64(0), rt.numRegisteredStore.Load()) + require.Greater(t, rt.numRegisteredReplica.Load(), int64(1000)) +} + +func TestMaybeTraceMsgAppResp(t *testing.T) { + rt := createTracer(10) + ctx, finish := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "test") + + require.True(t, rt.MaybeRegister(ctx, raftpb.Entry{Index: 1})) + require.Empty(t, rt.MaybeTrace(raftpb.Message{ + Term: 1, + From: 1, + To: 2, + Type: raftpb.MsgAppResp, + Index: uint64(5), + }), 0) + output := finish().String() + require.NoError(t, testutils.MatchInOrder(output, []string{"1->2 MsgAppResp Term:1 Index:5"}...)) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) +} + +func TestDupeMsgAppResp(t *testing.T) { + rt := createTracer(10) + ctx, finish := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "test") + + ent := raftpb.Entry{Index: 1} + require.True(t, rt.MaybeRegister(ctx, ent)) + require.Empty(t, rt.MaybeTrace(raftpb.Message{ + Term: 1, + From: 1, + To: 2, + Type: raftpb.MsgAppResp, + Index: uint64(5), + })) + // The second message should not trace. + require.Empty(t, rt.MaybeTrace(raftpb.Message{ + Term: 1, + From: 1, + To: 2, + Type: raftpb.MsgAppResp, + Index: uint64(6), + })) + + output := finish().String() + require.NoError(t, testutils.MatchInOrder(output, []string{"1->2 MsgAppResp Term:1 Index:5"}...)) + require.Error(t, testutils.MatchInOrder(output, []string{"1->2 MsgAppResp Term:1 Index:6"}...)) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) +} + +func TestTraceMsgStorageAppendResp(t *testing.T) { + rt := createTracer(10) + ctx, finish := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "test") + + ent := raftpb.Entry{Index: 1} + require.True(t, rt.MaybeRegister(ctx, ent)) + require.Empty(t, rt.MaybeTrace(raftpb.Message{ + From: 1, + To: 2, + Term: 3, + Type: raftpb.MsgStorageAppendResp, + Index: uint64(5), + LogTerm: uint64(4), + })) + + output := finish().String() + require.NoError(t, testutils.MatchInOrder(output, []string{"1->2 MsgStorageAppendResp Log:4/5"}...)) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) +} + +func TestDupeMsgStorageAppendResp(t *testing.T) { + rt := createTracer(10) + ctx, finish := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "test") + + ent := raftpb.Entry{Index: 1} + require.True(t, rt.MaybeRegister(ctx, ent)) + require.Empty(t, rt.MaybeTrace(raftpb.Message{ + From: 1, + To: 2, + Term: 3, + Type: raftpb.MsgStorageAppendResp, + Index: uint64(5), + LogTerm: uint64(4), + })) + // The second messsage should not trace. + require.Empty(t, rt.MaybeTrace(raftpb.Message{ + From: 5, + To: 6, + Term: 7, + Type: raftpb.MsgStorageAppendResp, + Index: uint64(8), + LogTerm: uint64(9), + })) + + output := finish().String() + require.NoError(t, testutils.MatchInOrder(output, []string{"1->2 MsgStorageAppendResp Log:4/5"}...)) + require.Error(t, testutils.MatchInOrder(output, []string{"5->6 MsgStorageAppendResp"}...)) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) +} + +func TestNoTraceMsgStorageAppendResp(t *testing.T) { + rt := createTracer(10) + ctx, finish := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "test") + + ent := raftpb.Entry{Index: 10} + require.True(t, rt.MaybeRegister(ctx, ent)) + + // This doesn't trace since the index is behind the entry index. + require.Empty(t, rt.MaybeTrace(raftpb.Message{ + From: 1, + To: 2, + Term: 3, + Type: raftpb.MsgStorageAppendResp, + Index: uint64(5), + LogTerm: uint64(4), + })) + + output := finish().String() + require.Error(t, testutils.MatchInOrder(output, []string{"MsgStorageAppendResp"}...)) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) +} + +func TestTraceMsgStorageApplyResp(t *testing.T) { + rt := createTracer(10) + ctx, finish := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "test") + + require.True(t, rt.MaybeRegister(ctx, raftpb.Entry{Index: 1})) + require.Empty(t, rt.MaybeTrace( + raftpb.Message{ + From: 1, + To: 2, + Type: raftpb.MsgStorageApplyResp, + Entries: []raftpb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 4}, + }, + })) + + output := finish().String() + require.NoError(t, testutils.MatchInOrder(output, + []string{ + `1->2 MsgStorageApplyResp LastEntry:2/4`, + `unregistered log index`, + }...)) + require.Equal(t, int64(0), rt.numRegisteredStore.Load()) +} + +func TestDuplicateIndex(t *testing.T) { + rt := createTracer(10) + ctx1, trace1 := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "trace1") + require.True(t, rt.MaybeRegister(ctx1, raftpb.Entry{Index: 1})) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) + require.Equal(t, int64(1), rt.numRegisteredReplica.Load()) + // This returns true indicating that the index is registered, but it doesn't + // add a new registration. + ctx2, trace2 := tracing.ContextWithRecordingSpan(context.Background(), rt.tracer, "trace2") + require.True(t, rt.MaybeRegister(ctx2, raftpb.Entry{Index: 1})) + require.Equal(t, int64(1), rt.numRegisteredStore.Load()) + require.Equal(t, int64(1), rt.numRegisteredReplica.Load()) + + // Unregister the entry with a MsgStorageApplyResp. + require.Empty(t, rt.MaybeTrace( + raftpb.Message{ + From: 1, + To: 2, + Type: raftpb.MsgStorageApplyResp, + Entries: []raftpb.Entry{ + {Term: 1, Index: 1}, + {Term: 2, Index: 4}, + }, + })) + // We expect the logs to go to the first trace. + output1 := trace1().String() + output2 := trace2().String() + require.NoError(t, testutils.MatchInOrder(output1, + []string{ + `1->2 MsgStorageApplyResp LastEntry:2/4`, + `unregistered log index`, + }...)) + require.NoError(t, testutils.MatchInOrder(output1, + []string{ + `additional registration for same index`, + }...)) + require.Error(t, testutils.MatchInOrder(output2, + []string{ + `1->2 MsgStorageApplyResp LastEntry:2/4`, + `unregistered log index`, + }...)) + require.NoError(t, testutils.MatchInOrder(output2, + []string{ + `duplicate registration ignored`, + }...)) + + require.Equal(t, int64(0), rt.numRegisteredStore.Load()) + require.Equal(t, int64(0), rt.numRegisteredReplica.Load()) +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1eb4a3369dfc..0b2fbc22408d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -891,6 +892,10 @@ type Replica struct { // MsgAppPull <=> LazyReplication. // Updated with both raftMu and mu held. currentRACv2Mode rac2.RaftMsgAppMode + + // raftTracer is used to trace raft messages that are sent with a + // tracing context. + raftTracer rafttrace.RaftTracer } // The raft log truncations that are pending. Access is protected by its own diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index 3586137f7c7b..5b9cdf49cb5b 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -145,7 +145,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { propCtx := ctx // raft scheduler's ctx var propSp *tracing.Span // If the client has a trace, put a child into propCtx. - if sp := tracing.SpanFromContext(cmd.proposal.ctx); sp != nil { + if sp := tracing.SpanFromContext(cmd.proposal.Context()); sp != nil { propCtx, propSp = sp.Tracer().StartSpanCtx( propCtx, "local proposal", tracing.WithParent(sp), ) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 4f85db924d3c..998f7258d3a5 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -328,7 +328,6 @@ func (r *Replica) makeReproposal(origP *ProposalData) (reproposal *ProposalData, // span "follows from" the proposal's span, if the proposal sticks around // for (some reincarnation of) the command to eventually apply, its trace // will reflect the reproposal as well. - ctx: origP.ctx, idKey: raftlog.MakeCmdIDKey(), proposedAtTicks: 0, // set in registerProposalLocked createdAtTicks: 0, // set in registerProposalLocked @@ -364,6 +363,8 @@ func (r *Replica) makeReproposal(origP *ProposalData) (reproposal *ProposalData, seedProposal: seedP, } + origCtx := origP.Context() + newProposal.ctx.Store(&origCtx) return newProposal, func() { // If the original proposal had an explicit span, it's an async consensus @@ -394,7 +395,8 @@ func (r *Replica) makeReproposal(origP *ProposalData) (reproposal *ProposalData, // // TODO(radu): Should this context be created via tracer.ForkSpan? // We'd need to make sure the span is finished eventually. - origP.ctx = r.AnnotateCtx(context.TODO()) + ctx := r.AnnotateCtx(context.TODO()) + origP.ctx.Store(&ctx) seedP.lastReproposal = newProposal } } diff --git a/pkg/kv/kvserver/replica_application_result_test.go b/pkg/kv/kvserver/replica_application_result_test.go index c5f2dfc996b4..51a83b9a50bd 100644 --- a/pkg/kv/kvserver/replica_application_result_test.go +++ b/pkg/kv/kvserver/replica_application_result_test.go @@ -37,8 +37,7 @@ func makeProposalData() *ProposalData { AdmissionOriginNode: 1, } - return &ProposalData{ - ctx: context.WithValue(context.Background(), struct{}{}, "nonempty-ctx"), + prop := ProposalData{ sp: &tracing.Span{}, idKey: "deadbeef", proposedAtTicks: 1, @@ -58,6 +57,9 @@ func makeProposalData() *ProposalData { seedProposal: nil, lastReproposal: nil, } + ctx := context.WithValue(context.Background(), struct{}{}, "nonempty-ctx") + prop.ctx.Store(&ctx) + return &prop } func TestProposalDataAndRaftCommandAreConsideredWhenAddingFields(t *testing.T) { @@ -73,8 +75,8 @@ func TestProposalDataAndRaftCommandAreConsideredWhenAddingFields(t *testing.T) { // NB: we can't use zerofields for two reasons: First, we have unexported fields // here, and second, we don't want to check for recursively populated structs (but // only for the top level fields). - require.Equal(t, 10, reflect.TypeOf(*prop.command).NumField()) - require.Equal(t, 19, reflect.TypeOf(*prop).NumField()) + require.Equal(t, 10, reflect.Indirect(reflect.ValueOf(prop.command)).NumField()) + require.Equal(t, 19, reflect.Indirect(reflect.ValueOf(prop)).NumField()) } func TestReplicaMakeReproposalChaininig(t *testing.T) { @@ -84,7 +86,7 @@ func TestReplicaMakeReproposalChaininig(t *testing.T) { var r Replica proposals := make([]*ProposalData, 1, 4) proposals[0] = makeProposalData() - sharedCtx := proposals[0].ctx + sharedCtx := proposals[0].Context() verify := func() { seed := proposals[0] @@ -102,9 +104,9 @@ func TestReplicaMakeReproposalChaininig(t *testing.T) { } // Only the latest reproposal must use the seed context. for _, prop := range proposals[:len(proposals)-1] { - require.NotEqual(t, sharedCtx, prop.ctx) + require.NotEqual(t, sharedCtx, prop.Context()) } - require.Equal(t, sharedCtx, proposals[len(proposals)-1].ctx) + require.Equal(t, sharedCtx, proposals[len(proposals)-1].Context()) } verify() diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 553b5e012fd7..3d730e240094 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { log.Fatalf(ctx, "removing raft group before destroying replica %s", r) } r.mu.internalRaftGroup = nil + r.mu.raftTracer.Close() } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index c1691a2d903c..193f98f9e57c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/raft" @@ -154,7 +155,7 @@ func newUninitializedReplicaWithoutRaftGroup( } // Expose proposal data for external test packages. return store.cfg.TestingKnobs.TestingProposalSubmitFilter(kvserverbase.ProposalFilterArgs{ - Ctx: p.ctx, + Ctx: p.Context(), RangeID: rangeID, StoreID: store.StoreID(), ReplicaID: replicaID, @@ -328,6 +329,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { return err } r.mu.internalRaftGroup = rg + r.mu.raftTracer = *rafttrace.NewRaftTracer(ctx, r.Tracer, r.ClusterSettings(), &r.store.concurrentRaftTraces) r.flowControlV2.InitRaftLocked( ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)), rg.LogMark()) return nil diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 20881c02b945..c0261d33b8eb 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -9,6 +9,7 @@ import ( "context" "os" "path/filepath" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/keys" @@ -116,7 +117,15 @@ type ProposalData struct { // that during command application one should always use `replicatedCmd.ctx` // for best coverage. `p.ctx` should be used when a `replicatedCmd` is not in // scope, i.e. outside of raft command application. - ctx context.Context + // + // The context may be updated during the proposal lifecycle but will never + // be nil. To clear out the context, set it to context.Background(). It is + // protected by an atomic pointer because it can be read without holding the + // raftMu. Use ProposalData.Context() to read it. + // + // TODO(baptist): Track down all the places where we read and write ctx and + // determine whether we can convert this back to non-atomic field. + ctx atomic.Pointer[context.Context] // An optional tracing span bound to the proposal in the case of async // consensus (it will be referenced by p.ctx). We need to finish this span @@ -216,6 +225,12 @@ type ProposalData struct { lastReproposal *ProposalData } +// Context returns the context associated with the proposal. The context may +// change during the lifetime of the proposal. +func (proposal *ProposalData) Context() context.Context { + return *proposal.ctx.Load() +} + // useReplicationAdmissionControl indicates whether this raft command should // be subject to replication admission control. func (proposal *ProposalData) useReplicationAdmissionControl() bool { @@ -270,7 +285,8 @@ func (proposal *ProposalData) signalProposalResult(pr proposalResult) { // // NB: `proposal.ec.repl` might already have been cleared if we arrive here // through finishApplication. - proposal.ctx = context.Background() + ctx := context.Background() + proposal.ctx.Store(&ctx) } } @@ -1050,13 +1066,13 @@ func (r *Replica) requestToProposal( // Fill out the results even if pErr != nil; we'll return the error below. proposal := &ProposalData{ - ctx: ctx, idKey: idKey, doneCh: make(chan proposalResult, 1), Local: &res.Local, Request: ba, leaseStatus: *st, } + proposal.ctx.Store(&ctx) if needConsensus { proposal.command = &kvserverpb.RaftCommand{ diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 633568fa7b5e..5d96eceb0098 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -126,6 +126,7 @@ type singleBatchProposer interface { getReplicaID() roachpb.ReplicaID flowControlHandle(ctx context.Context) kvflowcontrol.Handle onErrProposalDropped([]raftpb.Entry, []*ProposalData, raftpb.StateType) + registerForTracing(*ProposalData, raftpb.Entry) bool } // A proposer is an object that uses a propBuf to coordinate Raft proposals. @@ -255,7 +256,7 @@ func (b *propBuf) Insert(ctx context.Context, p *ProposalData, tok TrackedReques } if log.V(4) { - log.Infof(p.ctx, "submitting proposal %x", p.idKey) + log.Infof(p.Context(), "submitting proposal %x", p.idKey) } // Insert the proposal into the buffer's array. The buffer now takes ownership @@ -571,7 +572,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( Data: p.encodedCommand, }) nextProp++ - log.VEvent(p.ctx, 2, "flushing proposal to Raft") + log.VEvent(p.Context(), 2, "flushing proposal to Raft") // We don't want deduct flow tokens for reproposed commands, and of // course for proposals that didn't integrate with kvflowcontrol. @@ -581,7 +582,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( } else { admitHandles = append(admitHandles, admitEntHandle{ handle: p.raftAdmissionMeta, - pCtx: p.ctx, + pCtx: p.Context(), }) } } @@ -869,26 +870,34 @@ func proposeBatch( // TODO(bdarnell): Handle ErrProposalDropped better. // https://github.com/cockroachdb/cockroach/issues/21849 for _, p := range props { - if p.ctx != nil { - log.Event(p.ctx, "entry dropped") - } + log.Event(p.Context(), "entry dropped") } p.onErrProposalDropped(ents, props, raftGroup.BasicStatus().RaftState) return nil //nolint:returnerrcheck } - if err == nil { - // Now that we know what raft log position[1] this proposal is to end up - // in, deduct flow tokens for it. This is done without blocking (we've - // already waited for available flow tokens pre-evaluation). The tokens - // will later be returned once we're informed of the entry being - // admitted below raft. - // - // [1]: We're relying on an undocumented side effect of upstream raft - // API where it populates the index and term for the passed in - // slice of entries. See etcd-io/raft#57. - maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + if err != nil { + return err } - return err + // Now that we know what raft log position[1] this proposal is to end up + // in, deduct flow tokens for it. This is done without blocking (we've + // already waited for available flow tokens pre-evaluation). The tokens + // will later be returned once we're informed of the entry being + // admitted below raft. + // + // [1]: We're relying on an undocumented side effect of upstream raft + // API where it populates the index and term for the passed in + // slice of entries. See etcd-io/raft#57. + maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + + // Register the proposal with rafttrace. This will add the trace to the raft + // lifecycle. We trace at most one entry per batch, so break after the first + // one is successfully registered. + for i := range ents { + if p.registerForTracing(props[i], ents[i]) { + break + } + } + return nil } func maybeDeductFlowTokens( @@ -1175,6 +1184,10 @@ func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp { return (*Replica)(rp).closedTimestampTargetRLocked() } +func (rp *replicaProposer) registerForTracing(p *ProposalData, e raftpb.Entry) bool { + return (*Replica)(rp).mu.raftTracer.MaybeRegister(p.Context(), e) +} + func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { return (*Replica)(rp).withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index bdb47cb3a7eb..126febaaa575 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -217,6 +217,8 @@ func (t *testProposer) campaignLocked(ctx context.Context) { } } +func (t *testProposer) registerForTracing(*ProposalData, raftpb.Entry) bool { return true } + func (t *testProposer) rejectProposalWithErrLocked(_ context.Context, _ *ProposalData, err error) { if t.onRejectProposalWithErrLocked == nil { panic(fmt.Sprintf("unexpected rejectProposalWithErrLocked call: err=%v", err)) @@ -301,7 +303,6 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData { } } p := &ProposalData{ - ctx: context.Background(), idKey: kvserverbase.CmdIDKey("test-cmd"), command: &kvserverpb.RaftCommand{ ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ @@ -313,6 +314,8 @@ func (pc proposalCreator) newProposal(ba *kvpb.BatchRequest) *ProposalData { Request: ba, leaseStatus: pc.lease, } + ctx := context.Background() + p.ctx.Store(&ctx) p.encodedCommand = pc.encodeProposal(p) return p } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ddffd1b587cc..505fdb840bda 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -123,7 +123,7 @@ func (r *Replica) evalAndPropose( idKey := raftlog.MakeCmdIDKey() proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui) ba = proposal.Request // may have been updated - log.Event(proposal.ctx, "evaluated request") + log.Event(proposal.Context(), "evaluated request") // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. @@ -168,7 +168,7 @@ func (r *Replica) evalAndPropose( // from this point on. proposal.ec = makeReplicatedEndCmds(r, g, *st, timeutil.Now()) - log.VEventf(proposal.ctx, 2, + log.VEventf(proposal.Context(), 2, "proposing command to write %d new keys, %d new values, %d new intents, "+ "write batch size=%d bytes", proposal.command.ReplicatedEvalResult.Delta.KeyCount, @@ -204,7 +204,9 @@ func (r *Replica) evalAndPropose( // Fork the proposal's context span so that the proposal's context // can outlive the original proposer's context. - proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus") + ctx, sp := tracing.ForkSpan(ctx, "async consensus") + proposal.ctx.Store(&ctx) + proposal.sp = sp if proposal.sp != nil { // We can't leak this span if we fail to hand the proposal to the // replication layer, so finish it later in this method if we are to @@ -279,7 +281,7 @@ func (r *Replica) evalAndPropose( "command is too large: %d bytes (max: %d)", quotaSize, maxSize, )) } - log.VEventf(proposal.ctx, 2, "acquiring proposal quota (%d bytes)", quotaSize) + log.VEventf(proposal.Context(), 2, "acquiring proposal quota (%d bytes)", quotaSize) var err error proposal.quotaAlloc, err = r.maybeAcquireProposalQuota(ctx, ba, quotaSize) if err != nil { @@ -349,7 +351,8 @@ func (r *Replica) evalAndPropose( } // TODO(radu): Should this context be created via tracer.ForkSpan? // We'd need to make sure the span is finished eventually. - last.ctx = r.AnnotateCtx(context.TODO()) + ctx := r.AnnotateCtx(context.TODO()) + last.ctx.Store(&ctx) } return proposalCh, abandon, idKey, writeBytes, nil } @@ -396,12 +399,12 @@ func (r *Replica) propose( log.Errorf(ctx, "%v", err) return kvpb.NewError(err) } - log.KvDistribution.Infof(p.ctx, "proposing %s", crt) + log.KvDistribution.Infof(p.Context(), "proposing %s", crt) } else if p.command.ReplicatedEvalResult.AddSSTable != nil { - log.VEvent(p.ctx, 4, "sideloadable proposal detected") + log.VEvent(p.Context(), 4, "sideloadable proposal detected") r.store.metrics.AddSSTableProposals.Inc(1) } else if log.V(4) { - log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary()) + log.Infof(p.Context(), "proposing command %x: %s", p.idKey, p.Request.Summary()) } raftAdmissionMeta := p.raftAdmissionMeta @@ -430,7 +433,7 @@ func (r *Replica) propose( // Too verbose even for verbose logging, so manually enable if you want to // debug proposal sizes. if false { - log.Infof(p.ctx, `%s: proposal: %d + log.Infof(p.Context(), `%s: proposal: %d RaftCommand.ReplicatedEvalResult: %d RaftCommand.ReplicatedEvalResult.Delta: %d RaftCommand.WriteBatch: %d @@ -447,7 +450,7 @@ func (r *Replica) propose( // TODO(tschottdorf): can we mark them so lightstep can group them? const largeProposalEventThresholdBytes = 2 << 19 // 512kb if ln := len(p.encodedCommand); ln > largeProposalEventThresholdBytes { - log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(ln))) + log.Eventf(p.Context(), "proposal is large: %s", humanizeutil.IBytes(int64(ln))) } // Insert into the proposal buffer, which passes the command to Raft to be @@ -456,7 +459,7 @@ func (r *Replica) propose( // // NB: we must not hold r.mu while using the proposal buffer, see comment // on the field. - log.VEvent(p.ctx, 2, "submitting proposal to proposal buffer") + log.VEvent(p.Context(), 2, "submitting proposal to proposal buffer") if err := r.mu.proposalBuf.Insert(ctx, p, tok.Move(ctx)); err != nil { return kvpb.NewError(err) } @@ -635,6 +638,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest var admittedVector rac2.AdmittedVector err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { + // If this message requested tracing, begin tracing it. + for _, e := range req.TracedEntries { + r.mu.raftTracer.RegisterRemote(e) + } + r.mu.raftTracer.MaybeTrace(req.Message) // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -1208,6 +1216,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + r.mu.raftTracer.MaybeTrace(msgStorageAppend) if state, err = s.StoreEntries(ctx, state, app, cb, &stats.append); err != nil { return stats, errors.Wrap(err, "while storing log entries") } @@ -1239,6 +1248,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tApplicationBegin = timeutil.Now() if hasMsg(msgStorageApply) { + r.mu.raftTracer.MaybeTrace(msgStorageApply) r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") err := appTask.ApplyCommittedEntries(ctx) @@ -1657,7 +1667,7 @@ func (r *Replica) refreshProposalsLocked( // up here too. if p.command.MaxLeaseIndex <= r.shMu.state.LeaseAppliedIndex { r.cleanupFailedProposalLocked(p) - log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason) + log.Eventf(p.Context(), "retry proposal %x: %s", p.idKey, reason) p.finishApplication(ctx, makeProposalResultErr( kvpb.NewAmbiguousResultErrorf( "unable to determine whether command was applied via snapshot", @@ -1725,7 +1735,7 @@ func (r *Replica) refreshProposalsLocked( // definitely required, however. sort.Sort(reproposals) for _, p := range reproposals { - log.Eventf(p.ctx, "re-submitting command %x (MLI %d, CT %s): %s", + log.Eventf(p.Context(), "re-submitting command %x (MLI %d, CT %s): %s", p.idKey, p.command.MaxLeaseIndex, p.command.ClosedTimestamp, reason) if err := r.mu.proposalBuf.ReinsertLocked(ctx, p); err != nil { r.cleanupFailedProposalLocked(p) @@ -1988,6 +1998,7 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( } for i, m := range localMsgs { + r.mu.raftTracer.MaybeTrace(m) if err := raftGroup.Step(m); err != nil { log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", raft.DescribeMessage(m, raftEntryFormatter), err) @@ -2011,6 +2022,7 @@ func (r *Replica) sendRaftMessage( lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() r.mu.RLock() + traced := r.mu.raftTracer.MaybeTrace(msg) fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey @@ -2063,6 +2075,7 @@ func (r *Replica) sendRaftMessage( RangeStartKey: startKey, // usually nil UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, LowPriorityOverride: lowPriorityOverride, + TracedEntries: traced, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index b233127bedd1..1b307a9cab69 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7772,7 +7772,7 @@ func TestReplicaAbandonProposal(t *testing.T) { dropProp := int32(1) tc.repl.mu.Lock() tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { cancel() return atomic.LoadInt32(&dropProp) == 1, nil } @@ -7890,7 +7890,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { tc.repl.mu.Lock() tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride kvpb.LeaseAppliedIndex) { - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 { return wrongLeaseIndex } @@ -7994,7 +7994,7 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { abandoned := make(map[kvserverbase.CmdIDKey]struct{}) // protected by repl.mu tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := abandoned[p.idKey]; ok { - log.Infof(p.ctx, "abandoning command") + log.Infof(p.Context(), "abandoning command") return true, nil } return false, nil @@ -8066,7 +8066,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { if atomic.LoadInt32(&dropAll) == 1 { return true, nil } - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { seenCmds = append(seenCmds, int(p.command.MaxLeaseIndex)) } return false, nil @@ -8098,7 +8098,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } origIndexes := make([]int, 0, num) for _, p := range tc.repl.mu.proposals { - if v := p.ctx.Value(magicKey{}); v != nil { + if v := p.Context().Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex)) } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3ec55dbaf5bb..46867e63aee3 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -942,6 +942,11 @@ type Store struct { // has likely improved). draining atomic.Bool + // concurrentRaftTraces is the number of concurrent raft trace requests that + // are currently registered. This limit is used to prevent extensive raft + // tracing from inadvertently impacting performance. + concurrentRaftTraces atomic.Int64 + // Locking notes: To avoid deadlocks, the following lock order must be // obeyed: baseQueue.mu < Replica.raftMu < Replica.readOnlyCmdMu < Store.mu // < Replica.mu < Replica.unreachablesMu < Store.coalescedMu < Store.scheduler.mu. diff --git a/pkg/raft/raftpb/raft.go b/pkg/raft/raftpb/raft.go index b2073df26232..2169809f9339 100644 --- a/pkg/raft/raftpb/raft.go +++ b/pkg/raft/raftpb/raft.go @@ -24,6 +24,12 @@ type Epoch int64 // SafeValue implements the redact.SafeValue interface. func (e Epoch) SafeValue() {} +// The enums in raft are all safe for redaction. +func (MessageType) SafeValue() {} +func (EntryType) SafeValue() {} +func (ConfChangeType) SafeValue() {} +func (ConfChangeTransition) SafeValue() {} + // Priority specifies per-entry priorities, that are local to the interaction // between a leader-replica pair, i.e., they are not an invariant of a // particular entry in the raft log (the replica could be the leader itself or diff --git a/pkg/raft/util.go b/pkg/raft/util.go index dfad989b062b..2f86a7d651ff 100644 --- a/pkg/raft/util.go +++ b/pkg/raft/util.go @@ -200,6 +200,10 @@ func describeMessageWithIndent(indent string, m pb.Message, f EntryFormatter) st return buf.String() } +func DescribeTarget(id pb.PeerID) string { + return describeTarget(id) +} + func describeTarget(id pb.PeerID) string { switch id { case None: diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 1ecb31dd94ab..86f167465613 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -359,56 +359,58 @@ func (p *planner) maybeLogStatementInternal( defer releaseSampledQuery(sampledQuery) *sampledQuery = eventpb.SampledQuery{ - CommonSQLExecDetails: execDetails, - SkippedQueries: skippedQueries, - CostEstimate: p.curPlan.instrumentation.costEstimate, - Distribution: p.curPlan.instrumentation.distribution.String(), - PlanGist: p.curPlan.instrumentation.planGist.String(), - SessionID: p.extendedEvalCtx.SessionID.String(), - Database: p.CurrentDatabase(), - StatementID: p.stmt.QueryID.String(), - TransactionID: txnID, - StatementFingerprintID: stmtFingerprintID.String(), - MaxFullScanRowsEstimate: p.curPlan.instrumentation.maxFullScanRows, - TotalScanRowsEstimate: p.curPlan.instrumentation.totalScanRows, - OutputRowsEstimate: p.curPlan.instrumentation.outputRows, - StatsAvailable: p.curPlan.instrumentation.statsAvailable, - NanosSinceStatsCollected: int64(p.curPlan.instrumentation.nanosSinceStatsCollected), - BytesRead: p.curPlan.instrumentation.topLevelStats.bytesRead, - RowsRead: p.curPlan.instrumentation.topLevelStats.rowsRead, - RowsWritten: p.curPlan.instrumentation.topLevelStats.rowsWritten, - InnerJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.InnerJoin]), - LeftOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftOuterJoin]), - FullOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.FullOuterJoin]), - SemiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftSemiJoin]), - AntiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftAntiJoin]), - IntersectAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.IntersectAllJoin]), - ExceptAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.ExceptAllJoin]), - HashJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.HashJoin]), - CrossJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.CrossJoin]), - IndexJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.IndexJoin]), - LookupJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.LookupJoin]), - MergeJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.MergeJoin]), - InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), - ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), - ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), - ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(), - Regions: queryLevelStats.Regions, - SQLInstanceIDs: queryLevelStats.SQLInstanceIDs, - KVNodeIDs: queryLevelStats.KVNodeIDs, - UsedFollowerRead: queryLevelStats.UsedFollowerRead, - NetworkBytesSent: queryLevelStats.NetworkBytesSent, - MaxMemUsage: queryLevelStats.MaxMemUsage, - MaxDiskUsage: queryLevelStats.MaxDiskUsage, - KVBytesRead: queryLevelStats.KVBytesRead, - KVPairsRead: queryLevelStats.KVPairsRead, - KVRowsRead: queryLevelStats.KVRowsRead, - KvTimeNanos: queryLevelStats.KVTime.Nanoseconds(), - KvGrpcCalls: queryLevelStats.KVBatchRequestsIssued, - NetworkMessages: queryLevelStats.NetworkMessages, - CpuTimeNanos: queryLevelStats.CPUTime.Nanoseconds(), - IndexRecommendations: indexRecs, - Indexes: p.curPlan.instrumentation.indexesUsed, + CommonSQLExecDetails: execDetails, + SkippedQueries: skippedQueries, + CostEstimate: p.curPlan.instrumentation.costEstimate, + Distribution: p.curPlan.instrumentation.distribution.String(), + PlanGist: p.curPlan.instrumentation.planGist.String(), + SessionID: p.extendedEvalCtx.SessionID.String(), + Database: p.CurrentDatabase(), + StatementID: p.stmt.QueryID.String(), + TransactionID: txnID, + StatementFingerprintID: stmtFingerprintID.String(), + MaxFullScanRowsEstimate: p.curPlan.instrumentation.maxFullScanRows, + TotalScanRowsEstimate: p.curPlan.instrumentation.totalScanRows, + OutputRowsEstimate: p.curPlan.instrumentation.outputRows, + StatsAvailable: p.curPlan.instrumentation.statsAvailable, + NanosSinceStatsCollected: int64(p.curPlan.instrumentation.nanosSinceStatsCollected), + BytesRead: p.curPlan.instrumentation.topLevelStats.bytesRead, + RowsRead: p.curPlan.instrumentation.topLevelStats.rowsRead, + RowsWritten: p.curPlan.instrumentation.topLevelStats.rowsWritten, + InnerJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.InnerJoin]), + LeftOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftOuterJoin]), + FullOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.FullOuterJoin]), + SemiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftSemiJoin]), + AntiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftAntiJoin]), + IntersectAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.IntersectAllJoin]), + ExceptAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.ExceptAllJoin]), + HashJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.HashJoin]), + CrossJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.CrossJoin]), + IndexJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.IndexJoin]), + LookupJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.LookupJoin]), + MergeJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.MergeJoin]), + InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), + ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), + ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), + ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(), + Regions: queryLevelStats.Regions, + SQLInstanceIDs: queryLevelStats.SQLInstanceIDs, + KVNodeIDs: queryLevelStats.KVNodeIDs, + UsedFollowerRead: queryLevelStats.UsedFollowerRead, + NetworkBytesSent: queryLevelStats.NetworkBytesSent, + MaxMemUsage: queryLevelStats.MaxMemUsage, + MaxDiskUsage: queryLevelStats.MaxDiskUsage, + KVBytesRead: queryLevelStats.KVBytesRead, + KVPairsRead: queryLevelStats.KVPairsRead, + KVRowsRead: queryLevelStats.KVRowsRead, + KvTimeNanos: queryLevelStats.KVTime.Nanoseconds(), + KvGrpcCalls: queryLevelStats.KVBatchRequestsIssued, + NetworkMessages: queryLevelStats.NetworkMessages, + CpuTimeNanos: queryLevelStats.CPUTime.Nanoseconds(), + IndexRecommendations: indexRecs, + // TODO(mgartner): Use a slice of struct{uint64, uint64} instead of + // converting to strings. + Indexes: p.curPlan.instrumentation.indexesUsed.Strings(), ScanCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanCount]), ScanWithStatsCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsCount]), ScanWithStatsForecastCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsForecastCount]), diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 5056ec6559be..d2d7d302ea81 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -208,8 +208,10 @@ func (ex *connExecutor) recordStatementSummary( EndTime: phaseTimes.GetSessionPhaseTime(sessionphase.PlannerStartExecStmt).Add(svcLatRaw), FullScan: fullScan, ExecStats: queryLevelStats, - Indexes: planner.instrumentation.indexesUsed, - Database: planner.SessionData().Database, + // TODO(mgartner): Use a slice of struct{uint64, uint64} instead of + // converting to strings. + Indexes: planner.instrumentation.indexesUsed.Strings(), + Database: planner.SessionData().Database, } stmtFingerprintID, err := diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index e6c76d858f1a..4f8eaca6af36 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" + "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/execbuilder" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" "github.com/cockroachdb/cockroach/pkg/sql/opt/indexrec" "github.com/cockroachdb/cockroach/pkg/sql/opt/optbuilder" @@ -225,7 +226,7 @@ type instrumentationHelper struct { scanCounts [exec.NumScanCountTypes]int // indexesUsed list the indexes used in the query with format tableID@indexID. - indexesUsed []string + indexesUsed execbuilder.IndexesUsed // schemachangerMode indicates which schema changer mode was used to execute // the query. diff --git a/pkg/sql/opt/exec/execbuilder/BUILD.bazel b/pkg/sql/opt/exec/execbuilder/BUILD.bazel index ad8b1921d6ed..b06cb525c606 100644 --- a/pkg/sql/opt/exec/execbuilder/BUILD.bazel +++ b/pkg/sql/opt/exec/execbuilder/BUILD.bazel @@ -47,7 +47,6 @@ go_library( "//pkg/sql/sqlerrors", "//pkg/sql/sqltelemetry", "//pkg/sql/types", - "//pkg/util", "//pkg/util/buildutil", "//pkg/util/encoding", "//pkg/util/errorutil", diff --git a/pkg/sql/opt/exec/execbuilder/builder.go b/pkg/sql/opt/exec/execbuilder/builder.go index 34830973e177..c529e9ea3f81 100644 --- a/pkg/sql/opt/exec/execbuilder/builder.go +++ b/pkg/sql/opt/exec/execbuilder/builder.go @@ -7,6 +7,8 @@ package execbuilder import ( "context" + "slices" + "strconv" "time" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -168,7 +170,41 @@ type Builder struct { IsANSIDML bool // IndexesUsed list the indexes used in query with the format tableID@indexID. - IndexesUsed []string + IndexesUsed +} + +// IndexesUsed is a list of indexes used in a query. +type IndexesUsed struct { + indexes []struct { + tableID cat.StableID + indexID cat.StableID + } +} + +// add adds the given index to the list, if it is not already present. +func (iu *IndexesUsed) add(tableID, indexID cat.StableID) { + s := struct { + tableID cat.StableID + indexID cat.StableID + }{tableID, indexID} + if !slices.Contains(iu.indexes, s) { + iu.indexes = append(iu.indexes, s) + } +} + +// Strings returns a slice of strings with the format tableID@indexID for each +// index in the list. +// +// TODO(mgartner): Use a slice of struct{uint64, uint64} instead of converting +// to strings. +func (iu *IndexesUsed) Strings() []string { + res := make([]string, len(iu.indexes)) + const base = 10 + for i, u := range iu.indexes { + res[i] = strconv.FormatUint(uint64(u.tableID), base) + "@" + + strconv.FormatUint(uint64(u.indexID), base) + } + return res } // New constructs an instance of the execution node builder using the diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index ecdc5b305c1a..8a70b344376e 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -38,7 +38,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/errorutil" @@ -756,7 +755,7 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (_ execPlan, outputCols colOrdM return execPlan{}, colOrdMap{}, errors.AssertionFailedf("expected inverted index scan to have a constraint") } - b.IndexesUsed = util.CombineUnique(b.IndexesUsed, []string{fmt.Sprintf("%d@%d", tab.ID(), idx.ID())}) + b.IndexesUsed.add(tab.ID(), idx.ID()) // Save if we planned a full (large) table/index scan on the builder so that // the planner can be made aware later. We only do this for non-virtual @@ -2297,7 +2296,7 @@ func (b *Builder) buildIndexJoin( // TODO(radu): the distsql implementation of index join assumes that the input // starts with the PK columns in order (#40749). pri := tab.Index(cat.PrimaryIndex) - b.IndexesUsed = util.CombineUnique(b.IndexesUsed, []string{fmt.Sprintf("%d@%d", tab.ID(), pri.ID())}) + b.IndexesUsed.add(tab.ID(), pri.ID()) keyCols := make([]exec.NodeColumnOrdinal, pri.KeyColumnCount()) for i := range keyCols { keyCols[i], err = getNodeColumnOrdinal(inputCols, join.Table.ColumnID(pri.Column(i).Ordinal())) @@ -2675,7 +2674,7 @@ func (b *Builder) buildLookupJoin( tab := md.Table(join.Table) idx := tab.Index(join.Index) - b.IndexesUsed = util.CombineUnique(b.IndexesUsed, []string{fmt.Sprintf("%d@%d", tab.ID(), idx.ID())}) + b.IndexesUsed.add(tab.ID(), idx.ID()) locking, err := b.buildLocking(join.Table, join.Locking) if err != nil { @@ -2855,7 +2854,7 @@ func (b *Builder) buildInvertedJoin( md := b.mem.Metadata() tab := md.Table(join.Table) idx := tab.Index(join.Index) - b.IndexesUsed = util.CombineUnique(b.IndexesUsed, []string{fmt.Sprintf("%d@%d", tab.ID(), idx.ID())}) + b.IndexesUsed.add(tab.ID(), idx.ID()) prefixEqCols := make([]exec.NodeColumnOrdinal, len(join.PrefixKeyCols)) for i, c := range join.PrefixKeyCols { @@ -2997,10 +2996,8 @@ func (b *Builder) buildZigzagJoin( rightTable := md.Table(join.RightTable) leftIndex := leftTable.Index(join.LeftIndex) rightIndex := rightTable.Index(join.RightIndex) - b.IndexesUsed = util.CombineUnique(b.IndexesUsed, - []string{fmt.Sprintf("%d@%d", leftTable.ID(), leftIndex.ID())}) - b.IndexesUsed = util.CombineUnique(b.IndexesUsed, - []string{fmt.Sprintf("%d@%d", rightTable.ID(), rightIndex.ID())}) + b.IndexesUsed.add(leftTable.ID(), leftIndex.ID()) + b.IndexesUsed.add(rightTable.ID(), rightIndex.ID()) leftEqCols := make([]exec.TableColumnOrdinal, len(join.LeftEqCols)) rightEqCols := make([]exec.TableColumnOrdinal, len(join.RightEqCols)) diff --git a/pkg/testutils/lint/passes/fmtsafe/functions.go b/pkg/testutils/lint/passes/fmtsafe/functions.go index 4890f40faecb..87c6314f597f 100644 --- a/pkg/testutils/lint/passes/fmtsafe/functions.go +++ b/pkg/testutils/lint/passes/fmtsafe/functions.go @@ -120,6 +120,8 @@ var requireConstFmt = map[string]bool{ "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Fatalf": true, "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver.raftLogger).Panicf": true, + "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace.traceValue).logf": true, + "(*github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2.LogTracker).errorf": true, "(github.com/cockroachdb/cockroach/pkg/raft/raftlogger.Logger).Debugf": true, diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index 025db3047678..00e764fdeb6e 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -138,8 +138,12 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "ID": {}, }, "github.com/cockroachdb/cockroach/pkg/raft/raftpb": { - "Epoch": {}, - "PeerID": {}, + "Epoch": {}, + "PeerID": {}, + "MessageType": {}, + "EntryType": {}, + "ConfChangeType": {}, + "ConfChangeTransition": {}, }, "github.com/cockroachdb/cockroach/pkg/repstream/streampb": { "StreamID": {}, @@ -225,6 +229,10 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "WorkKind": {}, "QueueKind": {}, }, + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb": { + "TraceID": {}, + "SpanID": {}, + }, "github.com/cockroachdb/cockroach/pkg/util/hlc": { "ClockTimestamp": {}, "LegacyTimestamp": {}, diff --git a/pkg/util/tracing/tracingpb/recorded_span.go b/pkg/util/tracing/tracingpb/recorded_span.go index 90dfb675345b..381b521e960a 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.go +++ b/pkg/util/tracing/tracingpb/recorded_span.go @@ -22,9 +22,13 @@ const ( // TraceID is a probabilistically-unique id, shared by all spans in a trace. type TraceID uint64 +func (TraceID) SafeValue() {} + // SpanID is a probabilistically-unique span id. type SpanID uint64 +func (SpanID) SafeValue() {} + // Recording represents a group of RecordedSpans rooted at a fixed root span, as // returned by GetRecording. Spans are sorted by StartTime. type Recording []RecordedSpan