Skip to content

Commit

Permalink
Add metrics debugging options (grafana#3008)
Browse files Browse the repository at this point in the history
* first pass

Signed-off-by: Joe Elliott <[email protected]>

* stuff

Signed-off-by: Joe Elliott <[email protected]>

* fix tests. replace warning

Signed-off-by: Joe Elliott <[email protected]>

* manifest

Signed-off-by: Joe Elliott <[email protected]>

* docs

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* lint

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Oct 12, 2023
1 parent 2467e1b commit 67fd171
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 82 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
* [ENHANCEMENT] Add histogram buckets to metrics-generator config in user-configurable overrides [#2928](https://github.com/grafana/tempo/pull/2928) (@mar4uk)
* [ENHANCEMENT] Adds websocket support for search streaming. [#2971](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
**Breaking Change** Deprecated GRPC streaming
* [ENHANCEMENT] Add new config block to distributors to produce debug metrics. [#3008](https://github.com/grafana/tempo/pull/3008) (@joe-elliott)
**Breaking Change** Removed deprecated config option: distributor.log_received_spans
* [ENHANCEMENT] added a metrics generator config option to enable/disable X-Scope-OrgID headers on remote write. [#2974](https://github.com/grafana/tempo/pull/2974) (@vineetjp)
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix rare deadlock when uploading blocks to Azure Blob Storage [#2129](https://github.com/grafana/tempo/issues/2129) (@LasseHels)
Expand Down
4 changes: 2 additions & 2 deletions cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (c *Config) CheckConfig() []ConfigWarning {
warnings = append(warnings, warnBlocklistPollConcurrency)
}

if c.Distributor.LogReceivedTraces {
if c.Distributor.LogReceivedSpans.Enabled {
warnings = append(warnings, warnLogReceivedTraces)
}

Expand Down Expand Up @@ -250,7 +250,7 @@ var (
Explain: fmt.Sprintf("default=%d", tempodb.DefaultBlocklistPollConcurrency),
}
warnLogReceivedTraces = ConfigWarning{
Message: "c.Distributor.LogReceivedTraces is deprecated. The new flag is c.Distributor.log_received_spans.enabled",
Message: "Span logging is enabled. This is for debuging only and not recommended for production deployments.",
}
warnStorageTraceBackendLocal = ConfigWarning{
Message: "Local backend will not correctly retrieve traces with a distributed deployment unless all components have access to the same disk. You should probably be using object storage as a backend.",
Expand Down
4 changes: 3 additions & 1 deletion cmd/tempo/app/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func TestConfig_CheckConfig(t *testing.T) {
},
},
Distributor: distributor.Config{
LogReceivedTraces: true,
LogReceivedSpans: distributor.LogReceivedSpansConfig{
Enabled: true,
},
},
},
expect: []ConfigWarning{
Expand Down
13 changes: 8 additions & 5 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,21 @@ distributor:
- (repetition of above...)


# Optional.
# Enable to log every received trace id to help debug ingestion
# WARNING: Deprecated. Use log_received_spans instead.
[log_received_traces: <boolean> | default = false]

# Optional.
# Enable to log every received span to help debug ingestion or calculate span error distributions using the logs
# This is not recommended for production environments
log_received_spans:
[enabled: <boolean> | default = false]
[include_all_attributes: <boolean> | default = false]
[filter_by_status_error: <boolean> | default = false]

# Optional.
# Enable to metric every received span to help debug ingestion
# This is not recommended for production environments
metric_received_spans:
[enabled: <boolean> | default = false]
[root_only: <boolean> | default = false]

# Optional.
# Disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true
# note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
Expand Down
6 changes: 5 additions & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ distributor:
instance_addr: ""
receivers: {}
override_ring_key: distributor
log_received_traces: false
forwarders: []
extend_writes: true
ingester_client:
Expand Down Expand Up @@ -550,6 +549,7 @@ metrics_generator:
max_wal_time: 14400000
no_lockfile: false
remote_write_flush_deadline: 1m0s
remote_write_add_org_id_header: true
traces_storage:
path: ""
completedfilepath: ""
Expand Down Expand Up @@ -639,6 +639,7 @@ storage:
tags: {}
storage_class: ""
metadata: {}
native_aws_auth_enabled: false
azure:
storage_account_name: ""
storage_account_key: ""
Expand All @@ -652,6 +653,7 @@ storage:
buffer_size: 3145728
hedge_requests_at: 0s
hedge_requests_up_to: 2
use_v2_sdk: false
cache: ""
cache_min_compaction_level: 0
cache_max_block_age: 0s
Expand Down Expand Up @@ -716,6 +718,7 @@ overrides:
tags: {}
storage_class: ""
metadata: {}
native_aws_auth_enabled: false
azure:
storage_account_name: ""
storage_account_key: ""
Expand All @@ -729,6 +732,7 @@ overrides:
buffer_size: 3145728
hedge_requests_at: 0s
hedge_requests_up_to: 2
use_v2_sdk: false
memberlist:
node_name: ""
randomize_node_name: true
Expand Down
1 change: 0 additions & 1 deletion integration/e2e/config-limits-query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ server:
http_listen_port: 3200

distributor:
log_received_traces: true
receivers:
jaeger:
protocols:
Expand Down
14 changes: 9 additions & 5 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ type Config struct {
// receivers map for shim.
// This receivers node is equivalent in format to the receiver node in the
// otel collector: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
Receivers map[string]interface{} `yaml:"receivers"`
OverrideRingKey string `yaml:"override_ring_key"`
LogReceivedTraces bool `yaml:"log_received_traces"` // Deprecated
LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"`
Receivers map[string]interface{} `yaml:"receivers"`
OverrideRingKey string `yaml:"override_ring_key"`
LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"`
MetricReceivedSpans MetricReceivedSpansConfig `yaml:"metric_received_spans,omitempty"`

Forwarders forwarder.ConfigList `yaml:"forwarders"`

Expand All @@ -53,6 +53,11 @@ type LogReceivedSpansConfig struct {
FilterByStatusError bool `yaml:"filter_by_status_error"`
}

type MetricReceivedSpansConfig struct {
Enabled bool `yaml:"enabled"`
RootOnly bool `yaml:"root_only"`
}

// RegisterFlagsAndApplyDefaults registers flags and applies defaults
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
flagext.DefaultValues(&cfg.DistributorRing)
Expand All @@ -62,7 +67,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.OverrideRingKey = distributorRingKey
cfg.ExtendWrites = true

f.BoolVar(&cfg.LogReceivedTraces, util.PrefixConfig(prefix, "log-received-traces"), false, "Enable to log every received trace id to help debug ingestion.")
f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")
Expand Down
95 changes: 56 additions & 39 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ var (
Name: "distributor_spans_received_total",
Help: "The total number of spans received per tenant",
}, []string{"tenant"})
metricDebugSpansIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "distributor_debug_spans_received_total",
Help: "Debug counters for spans received per tenant",
}, []string{"tenant", "name", "service"})
metricBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "distributor_bytes_received_total",
Expand Down Expand Up @@ -326,12 +331,11 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te

batches := trace.Batches

if d.cfg.LogReceivedSpans.Enabled || d.cfg.LogReceivedTraces {
if d.cfg.LogReceivedSpans.IncludeAllAttributes {
logSpansWithAllAttributes(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
} else {
logSpans(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
}
if d.cfg.LogReceivedSpans.Enabled {
logSpans(batches, &d.cfg.LogReceivedSpans, d.logger)
}
if d.cfg.MetricReceivedSpans.Enabled {
metricSpans(batches, userID, &d.cfg.MetricReceivedSpans)
}

metricBytesIngested.WithLabelValues(userID).Add(float64(size))
Expand Down Expand Up @@ -542,60 +546,73 @@ func recordDiscaredSpans(err error, userID string, spanCount int) {
}
}

func logSpans(batches []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
func metricSpans(batches []*v1.ResourceSpans, tenantID string, cfg *MetricReceivedSpansConfig) {
for _, b := range batches {
serviceName := ""
if b.Resource != nil {
for _, a := range b.Resource.GetAttributes() {
if a.GetKey() == "service.name" {
serviceName = a.Value.GetStringValue()
break
}
}
}

for _, ils := range b.ScopeSpans {
for _, s := range ils.Spans {
if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR {
if cfg.RootOnly && len(s.ParentSpanId) != 0 {
continue
}
level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))

metricDebugSpansIngested.WithLabelValues(tenantID, s.Name, serviceName).Inc()
}
}
}
}

func logSpansWithAllAttributes(batch []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
for _, b := range batch {
logSpansInResourceWithAllAttributes(b, filterByStatusError, logger)
}
}
func logSpans(batches []*v1.ResourceSpans, cfg *LogReceivedSpansConfig, logger log.Logger) {
for _, b := range batches {
loggerWithAtts := logger

if cfg.IncludeAllAttributes {
for _, a := range b.Resource.GetAttributes() {
loggerWithAtts = log.With(
loggerWithAtts,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
}
}

func logSpansInResourceWithAllAttributes(rs *v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
for _, a := range rs.Resource.GetAttributes() {
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
}
for _, ils := range b.ScopeSpans {
for _, s := range ils.Spans {
if cfg.FilterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR {
continue
}

for _, ils := range rs.ScopeSpans {
for _, s := range ils.Spans {
if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR {
continue
logSpan(s, cfg.IncludeAllAttributes, loggerWithAtts)
}

logSpanWithAllAttributes(s, logger)
}
}
}

func logSpanWithAllAttributes(s *v1.Span, logger log.Logger) {
for _, a := range s.GetAttributes() {
func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) {
if allAttributes {
for _, a := range s.GetAttributes() {
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
}

latencySeconds := float64(s.GetEndTimeUnixNano()-s.GetStartTimeUnixNano()) / float64(time.Second.Nanoseconds())
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
"span_name", s.Name,
"span_duration_seconds", latencySeconds,
"span_kind", s.GetKind().String(),
"span_status", s.GetStatus().GetCode().String())
}

latencySeconds := float64(s.GetEndTimeUnixNano()-s.GetStartTimeUnixNano()) / float64(time.Second.Nanoseconds())
logger = log.With(
logger,
"span_name", s.Name,
"span_duration_seconds", latencySeconds,
"span_kind", s.GetKind().String(),
"span_status", s.GetStatus().GetCode().String())

level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
}

Expand Down
Loading

0 comments on commit 67fd171

Please sign in to comment.