Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
131850: raft: add tracing to raft r=pav-kv a=andrewbaptist

This commit adds tracing to the various places within raft where state transitions happen. When a message is first proposed, it matches the context of the original message to the raft entry and registers it for tracing. Then it adds to the trace at the key points during the transitions and finally stops tracing once a `MsgStorageApplyResp` is received past the entry.

Fixes: #104035

Release note: None

132365: opt: refactor used indexes r=mgartner a=mgartner

I noticed the unnecessary usage of strings to track index usage in some
CPU/heap profiles. I attempted to eliminate this entirely, but
unfortunately these strings are persisted, making modification of the
type difficult, and I had difficulty following the thread from their
creation all the way to their consumption - I think someone with more
expertise in the observability part of the code base would be required
to untangle this.

So for now I've settled for refactoring the creation of "used indexes" to
avoid using strings until later on during instrumentation. This has the
minor benefit possible de-duplicating the indexes before generating the
strings, if an index is used multiple times in a query.

I've also replaced the expensive usage of `fmt.Sprintf` for generating
the strings to the faster `+` concatenation (benchmarks for this are
at: https://gist.github.com/mgartner/28a9d52e54e91691d0a7617164aa3157).

Epic: None

Release note: None


Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
3 people committed Oct 18, 2024
3 parents 57afbbe + fe746c7 + 1a90a9b commit 472ea07
Show file tree
Hide file tree
Showing 32 changed files with 1,188 additions and 118 deletions.
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/protectedts:protectedts_test",
"//pkg/kv/kvserver/raftentry:raftentry_test",
"//pkg/kv/kvserver/raftlog:raftlog_test",
"//pkg/kv/kvserver/rafttrace:rafttrace_test",
"//pkg/kv/kvserver/raftutil:raftutil_test",
"//pkg/kv/kvserver/rangefeed:rangefeed_test",
"//pkg/kv/kvserver/rangelog:rangelog_test",
Expand Down Expand Up @@ -1513,6 +1514,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/raftentry:raftentry_test",
"//pkg/kv/kvserver/raftlog:raftlog",
"//pkg/kv/kvserver/raftlog:raftlog_test",
"//pkg/kv/kvserver/rafttrace:rafttrace",
"//pkg/kv/kvserver/rafttrace:rafttrace_test",
"//pkg/kv/kvserver/raftutil:raftutil",
"//pkg/kv/kvserver/raftutil:raftutil_test",
"//pkg/kv/kvserver/rangefeed:rangefeed",
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,11 @@ func (v variations) runTest(ctx context.Context, t test.Test, c cluster.Cluster)
`SET CLUSTER SETTING kv.lease.reject_on_leader_unknown.enabled = true`); err != nil {
t.Fatal(err)
}
// Enable raft tracing. Remove this once raft tracing is the default.
if _, err := db.ExecContext(ctx,
`SET CLUSTER SETTING kv.raft.max_concurrent_traces = '10'`); err != nil {
t.Fatal(err)
}
// This isn't strictly necessary, but it would be nice if this test passed at 10s (or lower).
if _, err := db.ExecContext(ctx,
`SET CLUSTER SETTING server.time_after_store_suspect = '10s'`); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_library(
"//pkg/kv/kvserver/multiqueue",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rafttrace",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
Expand Down Expand Up @@ -438,6 +439,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rafttrace",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
Expand Down
62 changes: 62 additions & 0 deletions pkg/kv/kvserver/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand All @@ -33,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -132,6 +135,65 @@ func TestRaftLogQueue(t *testing.T) {
}
}

func TestRaftTracing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// TODO(baptist): Remove this once we change the default to be enabled.
st := cluster.MakeTestingClusterSettings()
rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10)

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
RangeLeaseDuration: 24 * time.Hour, // disable lease moves
RaftElectionTimeoutTicks: 1 << 30, // disable elections
},
},
})
defer tc.Stopper().Stop(context.Background())
store := tc.GetFirstStoreFromServer(t, 0)

// Write a single value to ensure we have a leader on n1.
key := tc.ScratchRange(t)
_, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value")))
require.NoError(t, pErr.GoError())
require.NoError(t, tc.WaitForSplitAndInitialization(key))
// Set to have 3 voters.
tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...)

for i := 0; i < 100; i++ {
var finish func() tracingpb.Recording
ctx := context.Background()
if i == 50 {
// Trace a random request on a "client" tracer.
ctx, finish = tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "test")
}
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i))))
require.NoError(t, pErr.GoError())
// Note that this is the clients span, there may be additional logs created after the span is returned.
if finish != nil {
output := finish().String()
// NB: It is hard to get all the messages in an expected order. We
// simply ensure some of the key messages are returned. Also note
// that we want to make sure that the logs are not reported against
// the tracing library, but the line that called into it.
expectedMessages := []string{
`replica_proposal_buf.* flushing proposal to Raft`,
`replica_proposal_buf.* registering local trace`,
`replica_raft.* 1->2 MsgApp`,
`replica_raft.* 1->3 MsgApp`,
`replica_raft.* AppendThread->1 MsgStorageAppendResp`,
`ack-ing replication success to the client`,
}
require.NoError(t, testutils.MatchInOrder(output, expectedMessages...))
}
}
}

// TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the
// middle of applying a raft log truncation command that removes some entries
// from the sideloaded storage. The test expects that storage remains in a
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ message RaftHeartbeat {
bool lagging_followers_on_quiesce_accurate = 10;
}

// The traced entry from the leader along with the trace and span ID.
message TracedEntry {
uint64 index = 1 [(gogoproto.nullable) = false,
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/kv/kvpb.RaftIndex"];
uint64 trace_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
uint64 span_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "SpanID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.SpanID"];
}

// RaftMessageRequest is the request used to send raft messages using our
// protobuf-based RPC codec. If a RaftMessageRequest has a non-empty number of
// heartbeats or heartbeat_resps, the contents of the message field is treated
Expand Down Expand Up @@ -103,6 +113,12 @@ message RaftMessageRequest {
// indices. Used only with RACv2.
kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedState admitted_state = 14 [(gogoproto.nullable) = false];

// TracedEntry is a mapping from Raft index to trace and span ids for this
// request. They are set by the leaseholder and begin tracing on all
// replicas. Currently, traces are not returned to the leaseholder, but
// instead logged to a local log file.
repeated TracedEntry traced_entries = 15 [(gogoproto.nullable) = false];

reserved 10;
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func traceProposals(r *Replica, ids []kvserverbase.CmdIDKey, event string) {
r.mu.RLock()
for _, id := range ids {
if prop, ok := r.mu.proposals[id]; ok {
ctxs = append(ctxs, prop.ctx)
ctxs = append(ctxs, prop.Context())
}
}
r.mu.RUnlock()
Expand Down
38 changes: 38 additions & 0 deletions pkg/kv/kvserver/rafttrace/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "rafttrace",
srcs = ["rafttrace.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

go_test(
name = "rafttrace_test",
srcs = ["rafttrace_test.go"],
embed = [":rafttrace"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/raft/raftpb",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 472ea07

Please sign in to comment.