From 67fd1710b6781c4f39ffceefcd9ccb2a0f11cdbc Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 12 Oct 2023 07:34:50 -0400 Subject: [PATCH] Add metrics debugging options (#3008) * first pass Signed-off-by: Joe Elliott * stuff Signed-off-by: Joe Elliott * fix tests. replace warning Signed-off-by: Joe Elliott * manifest Signed-off-by: Joe Elliott * docs Signed-off-by: Joe Elliott * changelog Signed-off-by: Joe Elliott * lint Signed-off-by: Joe Elliott --------- Signed-off-by: Joe Elliott --- CHANGELOG.md | 2 + cmd/tempo/app/config.go | 4 +- cmd/tempo/app/config_test.go | 4 +- docs/sources/tempo/configuration/_index.md | 13 +-- docs/sources/tempo/configuration/manifest.md | 6 +- integration/e2e/config-limits-query.yaml | 1 - modules/distributor/config.go | 14 +-- modules/distributor/distributor.go | 95 ++++++++++++-------- modules/distributor/distributor_test.go | 37 ++------ 9 files changed, 94 insertions(+), 82 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c952daed001..395c71f6dec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ * [ENHANCEMENT] Add histogram buckets to metrics-generator config in user-configurable overrides [#2928](https://github.com/grafana/tempo/pull/2928) (@mar4uk) * [ENHANCEMENT] Adds websocket support for search streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott) **Breaking Change** Deprecated GRPC streaming +* [ENHANCEMENT] Add new config block to distributors to produce debug metrics. [#3008](https://github.com/grafana/tempo/pull/3008) (@joe-elliott) + **Breaking Change** Removed deprecated config option: distributor.log_received_spans * [ENHANCEMENT] added a metrics generator config option to enable/disable X-Scope-OrgID headers on remote write. [#2974](https://github.com/grafana/tempo/pull/2974) (@vineetjp) * [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio) * [BUGFIX] Fix rare deadlock when uploading blocks to Azure Blob Storage [#2129](https://github.com/grafana/tempo/issues/2129) (@LasseHels) diff --git a/cmd/tempo/app/config.go b/cmd/tempo/app/config.go index 353e9833cbb..011615a40dc 100644 --- a/cmd/tempo/app/config.go +++ b/cmd/tempo/app/config.go @@ -154,7 +154,7 @@ func (c *Config) CheckConfig() []ConfigWarning { warnings = append(warnings, warnBlocklistPollConcurrency) } - if c.Distributor.LogReceivedTraces { + if c.Distributor.LogReceivedSpans.Enabled { warnings = append(warnings, warnLogReceivedTraces) } @@ -250,7 +250,7 @@ var ( Explain: fmt.Sprintf("default=%d", tempodb.DefaultBlocklistPollConcurrency), } warnLogReceivedTraces = ConfigWarning{ - Message: "c.Distributor.LogReceivedTraces is deprecated. The new flag is c.Distributor.log_received_spans.enabled", + Message: "Span logging is enabled. This is for debuging only and not recommended for production deployments.", } warnStorageTraceBackendLocal = ConfigWarning{ Message: "Local backend will not correctly retrieve traces with a distributed deployment unless all components have access to the same disk. You should probably be using object storage as a backend.", diff --git a/cmd/tempo/app/config_test.go b/cmd/tempo/app/config_test.go index b05671a8a17..103af1c103b 100644 --- a/cmd/tempo/app/config_test.go +++ b/cmd/tempo/app/config_test.go @@ -40,7 +40,9 @@ func TestConfig_CheckConfig(t *testing.T) { }, }, Distributor: distributor.Config{ - LogReceivedTraces: true, + LogReceivedSpans: distributor.LogReceivedSpansConfig{ + Enabled: true, + }, }, }, expect: []ConfigWarning{ diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 336c5be2c55..1163f8d5dde 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -185,18 +185,21 @@ distributor: - (repetition of above...) - # Optional. - # Enable to log every received trace id to help debug ingestion - # WARNING: Deprecated. Use log_received_spans instead. - [log_received_traces: | default = false] - # Optional. # Enable to log every received span to help debug ingestion or calculate span error distributions using the logs + # This is not recommended for production environments log_received_spans: [enabled: | default = false] [include_all_attributes: | default = false] [filter_by_status_error: | default = false] + # Optional. + # Enable to metric every received span to help debug ingestion + # This is not recommended for production environments + metric_received_spans: + [enabled: | default = false] + [root_only: | default = false] + # Optional. # Disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true # note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index a2b9b42e31f..8745fcfe966 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -169,7 +169,6 @@ distributor: instance_addr: "" receivers: {} override_ring_key: distributor - log_received_traces: false forwarders: [] extend_writes: true ingester_client: @@ -550,6 +549,7 @@ metrics_generator: max_wal_time: 14400000 no_lockfile: false remote_write_flush_deadline: 1m0s + remote_write_add_org_id_header: true traces_storage: path: "" completedfilepath: "" @@ -639,6 +639,7 @@ storage: tags: {} storage_class: "" metadata: {} + native_aws_auth_enabled: false azure: storage_account_name: "" storage_account_key: "" @@ -652,6 +653,7 @@ storage: buffer_size: 3145728 hedge_requests_at: 0s hedge_requests_up_to: 2 + use_v2_sdk: false cache: "" cache_min_compaction_level: 0 cache_max_block_age: 0s @@ -716,6 +718,7 @@ overrides: tags: {} storage_class: "" metadata: {} + native_aws_auth_enabled: false azure: storage_account_name: "" storage_account_key: "" @@ -729,6 +732,7 @@ overrides: buffer_size: 3145728 hedge_requests_at: 0s hedge_requests_up_to: 2 + use_v2_sdk: false memberlist: node_name: "" randomize_node_name: true diff --git a/integration/e2e/config-limits-query.yaml b/integration/e2e/config-limits-query.yaml index a39d4a041e6..3379dfa6f0c 100644 --- a/integration/e2e/config-limits-query.yaml +++ b/integration/e2e/config-limits-query.yaml @@ -4,7 +4,6 @@ server: http_listen_port: 3200 distributor: - log_received_traces: true receivers: jaeger: protocols: diff --git a/modules/distributor/config.go b/modules/distributor/config.go index ab0903e940f..76ae5b3e9a3 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -32,10 +32,10 @@ type Config struct { // receivers map for shim. // This receivers node is equivalent in format to the receiver node in the // otel collector: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver - Receivers map[string]interface{} `yaml:"receivers"` - OverrideRingKey string `yaml:"override_ring_key"` - LogReceivedTraces bool `yaml:"log_received_traces"` // Deprecated - LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"` + Receivers map[string]interface{} `yaml:"receivers"` + OverrideRingKey string `yaml:"override_ring_key"` + LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"` + MetricReceivedSpans MetricReceivedSpansConfig `yaml:"metric_received_spans,omitempty"` Forwarders forwarder.ConfigList `yaml:"forwarders"` @@ -53,6 +53,11 @@ type LogReceivedSpansConfig struct { FilterByStatusError bool `yaml:"filter_by_status_error"` } +type MetricReceivedSpansConfig struct { + Enabled bool `yaml:"enabled"` + RootOnly bool `yaml:"root_only"` +} + // RegisterFlagsAndApplyDefaults registers flags and applies defaults func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { flagext.DefaultValues(&cfg.DistributorRing) @@ -62,7 +67,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.OverrideRingKey = distributorRingKey cfg.ExtendWrites = true - f.BoolVar(&cfg.LogReceivedTraces, util.PrefixConfig(prefix, "log-received-traces"), false, "Enable to log every received trace id to help debug ingestion.") f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.") f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.") diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index d41cc3cf97e..df832933b0a 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -77,6 +77,11 @@ var ( Name: "distributor_spans_received_total", Help: "The total number of spans received per tenant", }, []string{"tenant"}) + metricDebugSpansIngested = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "distributor_debug_spans_received_total", + Help: "Debug counters for spans received per tenant", + }, []string{"tenant", "name", "service"}) metricBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_bytes_received_total", @@ -326,12 +331,11 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te batches := trace.Batches - if d.cfg.LogReceivedSpans.Enabled || d.cfg.LogReceivedTraces { - if d.cfg.LogReceivedSpans.IncludeAllAttributes { - logSpansWithAllAttributes(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger) - } else { - logSpans(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger) - } + if d.cfg.LogReceivedSpans.Enabled { + logSpans(batches, &d.cfg.LogReceivedSpans, d.logger) + } + if d.cfg.MetricReceivedSpans.Enabled { + metricSpans(batches, userID, &d.cfg.MetricReceivedSpans) } metricBytesIngested.WithLabelValues(userID).Add(float64(size)) @@ -542,60 +546,73 @@ func recordDiscaredSpans(err error, userID string, spanCount int) { } } -func logSpans(batches []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) { +func metricSpans(batches []*v1.ResourceSpans, tenantID string, cfg *MetricReceivedSpansConfig) { for _, b := range batches { + serviceName := "" + if b.Resource != nil { + for _, a := range b.Resource.GetAttributes() { + if a.GetKey() == "service.name" { + serviceName = a.Value.GetStringValue() + break + } + } + } + for _, ils := range b.ScopeSpans { for _, s := range ils.Spans { - if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR { + if cfg.RootOnly && len(s.ParentSpanId) != 0 { continue } - level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId)) + + metricDebugSpansIngested.WithLabelValues(tenantID, s.Name, serviceName).Inc() } } } } -func logSpansWithAllAttributes(batch []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) { - for _, b := range batch { - logSpansInResourceWithAllAttributes(b, filterByStatusError, logger) - } -} +func logSpans(batches []*v1.ResourceSpans, cfg *LogReceivedSpansConfig, logger log.Logger) { + for _, b := range batches { + loggerWithAtts := logger + + if cfg.IncludeAllAttributes { + for _, a := range b.Resource.GetAttributes() { + loggerWithAtts = log.With( + loggerWithAtts, + "span_"+strutil.SanitizeLabelName(a.GetKey()), + tempo_util.StringifyAnyValue(a.GetValue())) + } + } -func logSpansInResourceWithAllAttributes(rs *v1.ResourceSpans, filterByStatusError bool, logger log.Logger) { - for _, a := range rs.Resource.GetAttributes() { - logger = log.With( - logger, - "span_"+strutil.SanitizeLabelName(a.GetKey()), - tempo_util.StringifyAnyValue(a.GetValue())) - } + for _, ils := range b.ScopeSpans { + for _, s := range ils.Spans { + if cfg.FilterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR { + continue + } - for _, ils := range rs.ScopeSpans { - for _, s := range ils.Spans { - if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR { - continue + logSpan(s, cfg.IncludeAllAttributes, loggerWithAtts) } - - logSpanWithAllAttributes(s, logger) } } } -func logSpanWithAllAttributes(s *v1.Span, logger log.Logger) { - for _, a := range s.GetAttributes() { +func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) { + if allAttributes { + for _, a := range s.GetAttributes() { + logger = log.With( + logger, + "span_"+strutil.SanitizeLabelName(a.GetKey()), + tempo_util.StringifyAnyValue(a.GetValue())) + } + + latencySeconds := float64(s.GetEndTimeUnixNano()-s.GetStartTimeUnixNano()) / float64(time.Second.Nanoseconds()) logger = log.With( logger, - "span_"+strutil.SanitizeLabelName(a.GetKey()), - tempo_util.StringifyAnyValue(a.GetValue())) + "span_name", s.Name, + "span_duration_seconds", latencySeconds, + "span_kind", s.GetKind().String(), + "span_status", s.GetStatus().GetCode().String()) } - latencySeconds := float64(s.GetEndTimeUnixNano()-s.GetStartTimeUnixNano()) / float64(time.Second.Nanoseconds()) - logger = log.With( - logger, - "span_name", s.Name, - "span_duration_seconds", latencySeconds, - "span_kind", s.GetKind().String(), - "span_status", s.GetStatus().GetCode().String()) - level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId)) } diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index e0ff58cdb24..e4a04f15df7 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -784,12 +784,11 @@ func TestDistributor(t *testing.T) { func TestLogSpans(t *testing.T) { for i, tc := range []struct { - LogReceivedTraces bool // Backwards compatibility with old config LogReceivedSpansEnabled bool filterByStatusError bool includeAllAttributes bool batches []*v1.ResourceSpans - expectedLogsSpan []logSpan + expectedLogsSpan []testLogSpan }{ { LogReceivedSpansEnabled: false, @@ -799,24 +798,7 @@ func TestLogSpans(t *testing.T) { makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), }), }, - expectedLogsSpan: []logSpan{}, - }, - { - LogReceivedTraces: true, - batches: []*v1.ResourceSpans{ - makeResourceSpans("test", []*v1.ScopeSpans{ - makeScope( - makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), - }), - }, - expectedLogsSpan: []logSpan{ - { - Msg: "received", - Level: "info", - TraceID: "0a0102030405060708090a0b0c0d0e0f", - SpanID: "dad44adc9a83b370", - }, - }, + expectedLogsSpan: []testLogSpan{}, }, { LogReceivedSpansEnabled: true, @@ -834,7 +816,7 @@ func TestLogSpans(t *testing.T) { makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", nil)), }), }, - expectedLogsSpan: []logSpan{ + expectedLogsSpan: []testLogSpan{ { Msg: "received", Level: "info", @@ -877,7 +859,7 @@ func TestLogSpans(t *testing.T) { makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), }), }, - expectedLogsSpan: []logSpan{ + expectedLogsSpan: []testLogSpan{ { Msg: "received", Level: "info", @@ -912,7 +894,7 @@ func TestLogSpans(t *testing.T) { makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), }, makeAttribute("resource_attribute2", "value2")), }, - expectedLogsSpan: []logSpan{ + expectedLogsSpan: []testLogSpan{ { Name: "Test Span2", Msg: "received", @@ -949,7 +931,7 @@ func TestLogSpans(t *testing.T) { makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil, makeAttribute("tag1", "value1"))), }), }, - expectedLogsSpan: []logSpan{ + expectedLogsSpan: []testLogSpan{ { Name: "Test Span", Msg: "received", @@ -964,7 +946,7 @@ func TestLogSpans(t *testing.T) { }, }, } { - t.Run(fmt.Sprintf("[%d] TestLogSpans LogReceivedTraces=%v LogReceivedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogReceivedTraces, tc.LogReceivedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { + t.Run(fmt.Sprintf("[%d] TestLogSpans LogReceivedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogReceivedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { limits := overrides.Config{} limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) @@ -972,7 +954,6 @@ func TestLogSpans(t *testing.T) { logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) d := prepare(t, limits, nil, logger) - d.cfg.LogReceivedTraces = tc.LogReceivedTraces d.cfg.LogReceivedSpans = LogReceivedSpansConfig{ Enabled: tc.LogReceivedSpansEnabled, FilterByStatusError: tc.filterByStatusError, @@ -986,7 +967,7 @@ func TestLogSpans(t *testing.T) { } bufJSON := "[" + strings.TrimRight(strings.ReplaceAll(buf.String(), "\n", ","), ",") + "]" - var actualLogsSpan []logSpan + var actualLogsSpan []testLogSpan err = json.Unmarshal([]byte(bufJSON), &actualLogsSpan) if err != nil { t.Fatal(err) @@ -1044,7 +1025,7 @@ func TestRateLimitRespected(t *testing.T) { assert.True(t, status.Code() == codes.ResourceExhausted, "Wrong status code") } -type logSpan struct { +type testLogSpan struct { Msg string `json:"msg"` Level string `json:"level"` TraceID string `json:"traceid"`