From 14848fd5c56fc1cdd62ba6d0f1c2a1e2d27340fc Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Tue, 2 May 2023 20:36:49 +0000 Subject: [PATCH] [metrics-generator] filter out spans based on policy (#2274) * First pass at span filtering Signed-off-by: Zach Leslie * Validate the spanmetrics filteirng config on startup Signed-off-by: Zach Leslie * Give some hope that we return a true match Signed-off-by: Zach Leslie * Drop unused argument service name and rely on attributes Signed-off-by: Zach Leslie * Handling a few intrinsics Signed-off-by: Zach Leslie * Include documentation for spanmetrics filtering policies Signed-off-by: Zach Leslie * Update docs/sources/tempo/metrics-generator/span_metrics.md Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> * Update docs/sources/tempo/metrics-generator/span_metrics.md Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> * Update docs/sources/tempo/metrics-generator/span_metrics.md Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> * Update docs/sources/tempo/metrics-generator/span_metrics.md Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> * Update docs/sources/tempo/metrics-generator/span_metrics.md Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> * Update docs/sources/tempo/metrics-generator/span_metrics.md Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> * Adjust filter policy to split policies during New() Signed-off-by: Zach Leslie * Update test for intrinsic Signed-off-by: Zach Leslie * Include benchmark and supporting span generator Signed-off-by: Zach Leslie * Include metric for counting spans that have been filtered out Signed-off-by: Zach Leslie * Include config warning when unsupported intrinic is used Signed-off-by: Zach Leslie * Relocate spanmetrics.FilterPolicy to sharedconfig package and implement overrides Signed-off-by: Zach Leslie * Include sharedconfig pacakge Signed-off-by: Zach Leslie * Update modules/generator/processor/spanmetrics/spanmetrics.go Co-authored-by: Joe Elliott * Refactor spanfilter into its own package Signed-off-by: Zach Leslie * Include tests for spanfilter.New() Signed-off-by: Zach Leslie * Update spanmetrics processor to return an error for spanfilter error Signed-off-by: Zach Leslie * Relocate config validation to spanfilter during New Signed-off-by: Zach Leslie * Update tests for spanmetrics error return Signed-off-by: Zach Leslie * Drop unused Signed-off-by: Zach Leslie * Update docs to include nesting of filtering config Signed-off-by: Zach Leslie * Exit early when attributes are unmatched Signed-off-by: Zach Leslie * Exit early when intrinsics are not matched Signed-off-by: Zach Leslie * Preallocate a couple variables Signed-off-by: Zach Leslie * Add note about use of RandomBatcher Signed-off-by: Zach Leslie * Update changelog * Drop TODO comment Signed-off-by: Zach Leslie * Add back the lost metric during rebase Signed-off-by: Zach Leslie * Fix policy override configuration Signed-off-by: Zach Leslie * Include generator config test Signed-off-by: Zach Leslie * Migrate the metric and expand reasons Signed-off-by: Zach Leslie * Update tests for discardCounter Signed-off-by: Zach Leslie * Include doc about which kinds are available for filtering Signed-off-by: Zach Leslie * Spellcheck * Perform number matching for kind and status Signed-off-by: Zach Leslie * Rename discardCounter to filteredSpansCounter Signed-off-by: Zach Leslie * Improve error quality Signed-off-by: Zach Leslie * Update error message in test Signed-off-by: Zach Leslie --------- Signed-off-by: Zach Leslie Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> Co-authored-by: Joe Elliott --- CHANGELOG.md | 1 + .../tempo/metrics-generator/span_metrics.md | 93 +- modules/generator/config.go | 3 + modules/generator/config_test.go | 62 + modules/generator/instance.go | 12 +- modules/generator/overrides.go | 2 + modules/generator/overrides_test.go | 11 +- .../generator/processor/spanmetrics/config.go | 4 + .../processor/spanmetrics/spanmetrics.go | 23 +- .../processor/spanmetrics/spanmetrics_test.go | 275 +++- modules/overrides/limits.go | 26 +- modules/overrides/overrides.go | 6 + pkg/spanfilter/config/config.go | 84 ++ pkg/spanfilter/spanfilter.go | 268 ++++ pkg/spanfilter/spanfilter_test.go | 1260 +++++++++++++++++ pkg/util/test/random.go | 212 +++ 16 files changed, 2314 insertions(+), 28 deletions(-) create mode 100644 pkg/spanfilter/config/config.go create mode 100644 pkg/spanfilter/spanfilter.go create mode 100644 pkg/spanfilter/spanfilter_test.go create mode 100644 pkg/util/test/random.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b691ffe498..d3a61850a42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [ENHANCEMENT] Add synchronous read mode to vParquet and vParquet2 optionally enabled by env vars [#2165](https://github.com/grafana/tempo/pull/2165) (@mdisibio) * [ENHANCEMENT] Add option to override metrics-generator ring port [#2399](https://github.com/grafana/tempo/pull/2399) (@mdisibio) * [ENHANCEMENT] Add support for IPv6 [#1555](https://github.com/grafana/tempo/pull/1555) (@zalegrala) +* [ENHANCEMENT] Add span filtering to spanmetrics processor [#2274](https://github.com/grafana/tempo/pull/2274) (@zalegrala) * [BUGFIX] tempodb integer divide by zero error [#2167](https://github.com/grafana/tempo/issues/2167) (@kroksys) * [CHANGE] **Breaking Change** Rename s3.insecure_skip_verify [#???](https://github.com/grafana/tempo/pull/???) (@zalegrala) ```yaml diff --git a/docs/sources/tempo/metrics-generator/span_metrics.md b/docs/sources/tempo/metrics-generator/span_metrics.md index 66a0351a59b..c2729c050b3 100644 --- a/docs/sources/tempo/metrics-generator/span_metrics.md +++ b/docs/sources/tempo/metrics-generator/span_metrics.md @@ -1,7 +1,7 @@ --- aliases: -- /docs/tempo/latest/server_side_metrics/span_metrics/ -- /docs/tempo/latest/metrics-generator/span_metrics/ + - /docs/tempo/latest/server_side_metrics/span_metrics/ + - /docs/tempo/latest/metrics-generator/span_metrics/ title: Generate metrics from spans weight: 400 --- @@ -11,8 +11,9 @@ weight: 400 The span metrics processor generates metrics from ingested tracing data, including request, error, and duration (RED) metrics. Span metrics generate two metrics: -* A counter that computes requests -* A histogram that tracks the distribution of durations of all requests + +- A counter that computes requests +- A histogram that tracks the distribution of durations of all requests Span metrics are of particular interest if your system is not monitored with metrics, but it has distributed tracing implemented. @@ -43,7 +44,7 @@ This processor is designed with the goal to mirror the implementation from the O The following metrics are exported: | Metric | Type | Labels | Description | -|--------------------------------|-----------|------------|------------------------------| +| ------------------------------ | --------- | ---------- | ---------------------------- | | traces_spanmetrics_latency | Histogram | Dimensions | Duration of the span | | traces_spanmetrics_calls_total | Counter | Dimensions | Total count of the span | | traces_spanmetrics_size_total | Counter | Dimensions | Total size of spans ingested | @@ -56,7 +57,6 @@ When a configured dimension collides with one of the default labels (e.g. `statu If you use ratio based sampler you can use custom sampler below to not lose metric information, you also need to set `metrics_generator.processor.span_metrics.span_multiplier_key` to `"X-SampleRatio"` - ```go package tracer import ( @@ -91,6 +91,85 @@ func (ds RatioBasedSampler) Description() string { } ``` +### Filtering + +In some cases, you may want to reduce the number of metrics produced by the `spanmetrics` processor. You can configure the processor to use an `include` filter to match criteria that must be present in the span in order to be included. Following the include filter, an `exclude` filter may be used to reject portions of what was previously included by the filter policy. + +Currently, only filtering by resource and span attributes with the following value types is supported. + +- `bool` +- `double` +- `int` +- `string` + +Additionally, these intrinsic span attributes may be filtered upon: + +- `name` +- `status` (code) +- `kind` + +The following intrinsic kinds are available for filtering. + +- `SPAN_KIND_SERVER` +- `SPAN_KIND_INTERNAL` +- `SPAN_KIND_CLIENT` +- `SPAN_KIND_PRODUCER` +- `SPAN_KIND_CONSUMER` + +Intrinsic keys can be acted on directly when implementing a filter policy. For example: + +```yaml +--- +metrics_generator: + processor: + span_metrics: + filter_policies: + - include: + match_type: strict + attributes: + - key: kind + value: SPAN_KIND_SERVER +``` + +In this example, spans which are of `kind` "server" are included for metrics export. + +When selecting spans based on non-intrinsic attributes, it is required to specify the scope of the attribute, similar to how it is specified in TraceQL. For example, if the `resource` contains a `location` attribute which is to be used in a filter policy, then the reference needs to be specified as `resource.location`. This requires users to know and specify which scope an attribute is to be found and avoids the ambiguity of conflicting values at differing scopes. The following may help illustrate. + +```yaml +--- +metrics_generator: + processor: + span_metrics: + filter_policies: + - include: + match_type: strict + attributes: + - key: resource.location + value: earth +``` + +In the above examples, we are using `match_type` of `strict`, which is a direct comparison of values. An additional option for `match_type` is `regex`. This allows users to build a regular expression to match against. + +```yaml +--- +metrics_generator: + processor: + span_metrics: + filter_policies: + - include: + match_type: regex + attributes: + - key: resource.location + value: eu-.* + - exclude: + match_type: regex + attributes: + - key: resource.tier + value: dev-.* +``` + +In the above, we first include all spans which have a `resource.location` that begins with `eu-` with the `include` statement, and then exclude those with begin with `dev-`. In this way, a flexible approach to filtering can be achieved to ensure that only metrics which are important are generated. + ## Example -

Span metrics overview

\ No newline at end of file +

Span metrics overview

diff --git a/modules/generator/config.go b/modules/generator/config.go index 95b19c8f8f3..d705ada2108 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -73,6 +73,9 @@ func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userI return ProcessorConfig{}, errors.Wrap(err, "fail to apply overrides") } } + if filterPolicies := o.MetricsGeneratorProcessorSpanMetricsFilterPolicies(userID); filterPolicies != nil { + copyCfg.SpanMetrics.FilterPolicies = filterPolicies + } return copyCfg, nil } diff --git a/modules/generator/config_test.go b/modules/generator/config_test.go index 6addfc4cc93..d6cbc03224b 100644 --- a/modules/generator/config_test.go +++ b/modules/generator/config_test.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/tempo/modules/generator/processor/servicegraphs" "github.com/grafana/tempo/modules/generator/processor/spanmetrics" + "github.com/grafana/tempo/pkg/spanfilter/config" ) func TestProcessorConfig_copyWithOverrides(t *testing.T) { @@ -69,4 +70,65 @@ func TestProcessorConfig_copyWithOverrides(t *testing.T) { _, err := original.copyWithOverrides(o, "tenant") require.Error(t, err) }) + + t.Run("nil policy overrides", func(t *testing.T) { + o := &mockOverrides{ + spanMetricsFilterPolicies: nil, + } + + copied, err := original.copyWithOverrides(o, "tenant") + require.NoError(t, err) + + assert.Equal(t, *original, copied) + }) + + t.Run("empty policy overrides", func(t *testing.T) { + o := &mockOverrides{ + spanMetricsFilterPolicies: []config.FilterPolicy{}, + } + + copied, err := original.copyWithOverrides(o, "tenant") + require.NoError(t, err) + + assert.NotEqual(t, *original, copied) + + assert.Equal(t, []config.FilterPolicy{}, copied.SpanMetrics.FilterPolicies) + }) + + t.Run("policy overrides", func(t *testing.T) { + o := &mockOverrides{ + spanMetricsFilterPolicies: []config.FilterPolicy{ + { + Include: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "key", + Value: "value", + }, + }, + }, + }, + }, + } + + copied, err := original.copyWithOverrides(o, "tenant") + require.NoError(t, err) + + assert.NotEqual(t, *original, copied) + + assert.Equal(t, []config.FilterPolicy{ + { + Include: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "key", + Value: "value", + }, + }, + }, + }, + }, copied.SpanMetrics.FilterPolicies) + }) } diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 88938e0ce88..9f0e7efb8ba 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -52,7 +52,10 @@ var ( }, []string{"tenant", "reason"}) ) -const reasonOutsideTimeRangeSlack = "outside_metrics_ingestion_slack" +const ( + reasonOutsideTimeRangeSlack = "outside_metrics_ingestion_slack" + reasonSpanMetricsFiltered = "span_metrics_filtered" +) type instance struct { cfg *Config @@ -256,9 +259,14 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error level.Debug(i.logger).Log("msg", "adding processor", "processorName", processorName) var newProcessor processor.Processor + var err error switch processorName { case spanmetrics.Name: - newProcessor = spanmetrics.New(cfg.SpanMetrics, i.registry) + filteredSpansCounter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonSpanMetricsFiltered) + newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter) + if err != nil { + return err + } case servicegraphs.Name: newProcessor = servicegraphs.New(cfg.ServiceGraphs, i.instanceID, i.registry, i.logger) default: diff --git a/modules/generator/overrides.go b/modules/generator/overrides.go index f564ec8040b..857b2950ecc 100644 --- a/modules/generator/overrides.go +++ b/modules/generator/overrides.go @@ -3,6 +3,7 @@ package generator import ( "github.com/grafana/tempo/modules/generator/registry" "github.com/grafana/tempo/modules/overrides" + filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" ) type metricsGeneratorOverrides interface { @@ -14,6 +15,7 @@ type metricsGeneratorOverrides interface { MetricsGeneratorProcessorSpanMetricsHistogramBuckets(userID string) []float64 MetricsGeneratorProcessorSpanMetricsDimensions(userID string) []string MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions(userID string) map[string]bool + MetricsGeneratorProcessorSpanMetricsFilterPolicies(userID string) []filterconfig.FilterPolicy } var _ metricsGeneratorOverrides = (*overrides.Overrides)(nil) diff --git a/modules/generator/overrides_test.go b/modules/generator/overrides_test.go index bf7fe9bf1d4..300d388c395 100644 --- a/modules/generator/overrides_test.go +++ b/modules/generator/overrides_test.go @@ -1,6 +1,10 @@ package generator -import "time" +import ( + "time" + + filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" +) type mockOverrides struct { processors map[string]struct{} @@ -9,6 +13,7 @@ type mockOverrides struct { spanMetricsHistogramBuckets []float64 spanMetricsDimensions []string spanMetricsIntrinsicDimensions map[string]bool + spanMetricsFilterPolicies []filterconfig.FilterPolicy } var _ metricsGeneratorOverrides = (*mockOverrides)(nil) @@ -48,3 +53,7 @@ func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsDimensions(userID st func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions(userID string) map[string]bool { return m.spanMetricsIntrinsicDimensions } + +func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsFilterPolicies(userID string) []filterconfig.FilterPolicy { + return m.spanMetricsFilterPolicies +} diff --git a/modules/generator/processor/spanmetrics/config.go b/modules/generator/processor/spanmetrics/config.go index a5ee780e6e5..b6ceac96c7f 100644 --- a/modules/generator/processor/spanmetrics/config.go +++ b/modules/generator/processor/spanmetrics/config.go @@ -3,6 +3,7 @@ package spanmetrics import ( "flag" + filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -34,6 +35,9 @@ type Config struct { // Subprocessor options for this Processor include Latency, Count, Size // These are metrics categories that exist under the umbrella of Span Metrics Subprocessors map[Subprocessor]bool + + // FilterPolicies is a list of policies that will be applied to spans for inclusion or exlusion. + FilterPolicies []filterconfig.FilterPolicy `yaml:"filter_policies"` } func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { diff --git a/modules/generator/processor/spanmetrics/spanmetrics.go b/modules/generator/processor/spanmetrics/spanmetrics.go index 1d294527d73..7895aed8024 100644 --- a/modules/generator/processor/spanmetrics/spanmetrics.go +++ b/modules/generator/processor/spanmetrics/spanmetrics.go @@ -10,10 +10,12 @@ import ( gen "github.com/grafana/tempo/modules/generator/processor" processor_util "github.com/grafana/tempo/modules/generator/processor/util" "github.com/grafana/tempo/modules/generator/registry" + "github.com/grafana/tempo/pkg/spanfilter" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/resource/v1" v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1" tempo_util "github.com/grafana/tempo/pkg/util" + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -31,11 +33,14 @@ type Processor struct { spanMetricsDurationSeconds registry.Histogram spanMetricsSizeTotal registry.Counter + filter *spanfilter.SpanFilter + filteredSpansCounter prometheus.Counter + // for testing now func() time.Time } -func New(cfg Config, registry registry.Registry) gen.Processor { +func New(cfg Config, registry registry.Registry, spanDiscardCounter prometheus.Counter) (gen.Processor, error) { labels := make([]string, 0, 4+len(cfg.Dimensions)) if cfg.IntrinsicDimensions.Service { @@ -68,10 +73,18 @@ func New(cfg Config, registry registry.Registry) gen.Processor { if cfg.Subprocessors[Size] { p.spanMetricsSizeTotal = registry.NewCounter(metricSizeTotal, labels) } + + filter, err := spanfilter.NewSpanFilter(cfg.FilterPolicies) + if err != nil { + return nil, err + } + p.Cfg = cfg p.registry = registry p.now = time.Now - return p + p.filteredSpansCounter = spanDiscardCounter + p.filter = filter + return p, nil } func (p *Processor) Name() string { @@ -95,7 +108,11 @@ func (p *Processor) aggregateMetrics(resourceSpans []*v1_trace.ResourceSpans) { for _, ils := range rs.ScopeSpans { for _, span := range ils.Spans { - p.aggregateMetricsForSpan(svcName, rs.Resource, span) + if p.filter.ApplyFilterPolicy(rs.Resource, span) { + p.aggregateMetricsForSpan(svcName, rs.Resource, span) + continue + } + p.filteredSpansCounter.Inc() } } } diff --git a/modules/generator/processor/spanmetrics/spanmetrics_test.go b/modules/generator/processor/spanmetrics/spanmetrics_test.go index 39001a0bb0e..c7a0dc371b6 100644 --- a/modules/generator/processor/spanmetrics/spanmetrics_test.go +++ b/modules/generator/processor/spanmetrics/spanmetrics_test.go @@ -4,29 +4,47 @@ import ( "context" "fmt" "math" + "os" "strconv" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/tempo/modules/generator/registry" + filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" "github.com/grafana/tempo/pkg/tempopb" common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" trace_v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util/test" ) +var ( + metricSpansDiscarded = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "metrics_generator_spans_discarded_total", + Help: "The total number of discarded spans received per tenant", + }, []string{"tenant", "reason"}) +) + func TestSpanMetrics(t *testing.T) { testRegistry := registry.NewTestRegistry() + filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} - p := New(cfg, testRegistry) + p, err := New(cfg, testRegistry, filteredSpansCounter) + require.NoError(t, err) defer p.Shutdown(context.Background()) + require.Equal(t, p.Name(), "span-metrics") + // TODO give these spans some duration so we can verify latencies are recorded correctly, in fact we should also test with various span names etc. batch := test.MakeBatch(10, nil) @@ -53,6 +71,8 @@ func TestSpanMetrics(t *testing.T) { func TestSpanMetrics_dimensions(t *testing.T) { testRegistry := registry.NewTestRegistry() + filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} @@ -60,7 +80,8 @@ func TestSpanMetrics_dimensions(t *testing.T) { cfg.IntrinsicDimensions.StatusMessage = true cfg.Dimensions = []string{"foo", "bar", "does-not-exist"} - p := New(cfg, testRegistry) + p, err := New(cfg, testRegistry, filteredSpansCounter) + require.NoError(t, err) defer p.Shutdown(context.Background()) // TODO create some spans that are missing the custom dimensions/tags @@ -106,13 +127,16 @@ func TestSpanMetrics_dimensions(t *testing.T) { func TestSpanMetrics_collisions(t *testing.T) { testRegistry := registry.NewTestRegistry() + filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.Dimensions = []string{"span.kind", "span_name"} cfg.IntrinsicDimensions.SpanKind = false - p := New(cfg, testRegistry) + p, err := New(cfg, testRegistry, filteredSpansCounter) + require.NoError(t, err) defer p.Shutdown(context.Background()) batch := test.MakeBatch(10, nil) @@ -150,8 +174,253 @@ func TestSpanMetrics_collisions(t *testing.T) { assert.Equal(t, 10.0, testRegistry.Query("traces_spanmetrics_latency_sum", lbls)) } +func TestSpanMetrics_applyFilterPolicy(t *testing.T) { + filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + + cases := []struct { + filterPolicies []filterconfig.FilterPolicy + expectedMatches float64 + expectedRejections float64 + }{ + { + expectedMatches: 10.0, + expectedRejections: 0.0, + filterPolicies: []filterconfig.FilterPolicy{ + { + + Include: &filterconfig.PolicyMatch{ + MatchType: filterconfig.Strict, + Attributes: []filterconfig.MatchPolicyAttribute{ + { + Key: "span.foo", + Value: "foo-value", + }, + }, + }, + }, + }, + }, + { + expectedMatches: 0.0, + expectedRejections: 10.0, + filterPolicies: []filterconfig.FilterPolicy{ + { + + Include: &filterconfig.PolicyMatch{ + MatchType: filterconfig.Strict, + Attributes: []filterconfig.MatchPolicyAttribute{ + { + Key: "span.nope", + Value: "nothere", + }, + }, + }, + }, + }, + }, + { + expectedMatches: 0.0, + expectedRejections: 10.0, + filterPolicies: []filterconfig.FilterPolicy{ + { + Exclude: &filterconfig.PolicyMatch{ + MatchType: filterconfig.Strict, + Attributes: []filterconfig.MatchPolicyAttribute{ + { + Key: "status", + Value: "STATUS_CODE_OK", + }, + }, + }, + }, + }, + }, + { + expectedMatches: 10.0, + expectedRejections: 0.0, + filterPolicies: []filterconfig.FilterPolicy{ + { + Include: &filterconfig.PolicyMatch{ + MatchType: filterconfig.Regex, + Attributes: []filterconfig.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_.*", + }, + }, + }, + }, + }, + }, + } + + for i, tc := range cases { + testName := fmt.Sprintf("filter_policy_%d", i) + t.Run(testName, func(t *testing.T) { + t.Logf("test case: %s", testName) + + cfg := Config{} + cfg.RegisterFlagsAndApplyDefaults("", nil) + cfg.HistogramBuckets = []float64{0.5, 1} + cfg.IntrinsicDimensions.SpanKind = false + cfg.IntrinsicDimensions.StatusMessage = true + cfg.Dimensions = []string{"foo", "bar", "does-not-exist"} + cfg.FilterPolicies = tc.filterPolicies + + testRegistry := registry.NewTestRegistry() + p, err := New(cfg, testRegistry, filteredSpansCounter) + require.NoError(t, err) + defer p.Shutdown(context.Background()) + + // TODO create some spans that are missing the custom dimensions/tags + batch := test.MakeBatch(10, nil) + + // Add some attributes + for _, rs := range batch.ScopeSpans { + for _, s := range rs.Spans { + s.Attributes = append(s.Attributes, &common_v1.KeyValue{ + Key: "foo", + Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "foo-value"}}, + }) + + s.Attributes = append(s.Attributes, &common_v1.KeyValue{ + Key: "bar", + Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "bar-value"}}, + }) + } + } + + t.Logf("batch: %v", batch) + + p.PushSpans(context.Background(), &tempopb.PushSpansRequest{Batches: []*trace_v1.ResourceSpans{batch}}) + + t.Logf("%s", testRegistry) + + lbls := labels.FromMap(map[string]string{ + "service": "test-service", + "span_name": "test", + "status_code": "STATUS_CODE_OK", + "status_message": "OK", + "foo": "foo-value", + "bar": "bar-value", + "does_not_exist": "", + }) + + assert.Equal(t, tc.expectedMatches, testRegistry.Query("traces_spanmetrics_calls_total", lbls)) + + assert.Equal(t, 0.0, testRegistry.Query("traces_spanmetrics_latency_bucket", withLe(lbls, 0.5))) + assert.Equal(t, tc.expectedMatches, testRegistry.Query("traces_spanmetrics_latency_bucket", withLe(lbls, 1))) + assert.Equal(t, tc.expectedMatches, testRegistry.Query("traces_spanmetrics_latency_bucket", withLe(lbls, math.Inf(1)))) + assert.Equal(t, tc.expectedMatches, testRegistry.Query("traces_spanmetrics_latency_count", lbls)) + assert.Equal(t, tc.expectedMatches, testRegistry.Query("traces_spanmetrics_latency_sum", lbls)) + }) + } +} + func withLe(lbls labels.Labels, le float64) labels.Labels { lb := labels.NewBuilder(lbls) lb = lb.Set(labels.BucketLabel, strconv.FormatFloat(le, 'f', -1, 64)) return lb.Labels(nil) } + +func BenchmarkSpanMetrics_applyFilterPolicyNone(b *testing.B) { + // Generate a batch of 100k spans + // r, done := test.NewRandomBatcher() + // defer done() + // batch := r.GenerateBatch(1e6) + // data, _ := batch.Marshal() + // _ = ioutil.WriteFile("testbatch100k", data, 0600) + + // Read the file generated above + data, err := os.ReadFile("testbatch100k") + require.NoError(b, err) + batch := &trace_v1.ResourceSpans{} + err = batch.Unmarshal(data) + require.NoError(b, err) + + // b.Logf("size: %s", humanize.Bytes(uint64(batch.Size()))) + // b.Logf("span count: %d", len(batch.ScopeSpans)) + + policies := []filterconfig.FilterPolicy{} + + benchmarkFilterPolicy(b, policies, batch) +} + +func BenchmarkSpanMetrics_applyFilterPolicySmall(b *testing.B) { + // Read the file generated above + data, err := os.ReadFile("testbatch100k") + require.NoError(b, err) + batch := &trace_v1.ResourceSpans{} + err = batch.Unmarshal(data) + require.NoError(b, err) + + policies := []filterconfig.FilterPolicy{ + { + Include: &filterconfig.PolicyMatch{ + MatchType: filterconfig.Strict, + Attributes: []filterconfig.MatchPolicyAttribute{ + { + Key: "span.foo", + Value: "foo-value", + }, + }, + }, + }, + } + + benchmarkFilterPolicy(b, policies, batch) +} + +func BenchmarkSpanMetrics_applyFilterPolicyMedium(b *testing.B) { + // Read the file generated above + data, err := os.ReadFile("testbatch100k") + require.NoError(b, err) + batch := &trace_v1.ResourceSpans{} + err = batch.Unmarshal(data) + require.NoError(b, err) + + policies := []filterconfig.FilterPolicy{ + { + Include: &filterconfig.PolicyMatch{ + MatchType: filterconfig.Strict, + Attributes: []filterconfig.MatchPolicyAttribute{ + { + Key: "span.foo", + Value: "foo-value", + }, + { + Key: "span.x", + Value: "foo-value", + }, + { + Key: "span.y", + Value: "foo-value", + }, + { + Key: "span.z", + Value: "foo-value", + }, + }, + }, + }, + } + + benchmarkFilterPolicy(b, policies, batch) +} + +func benchmarkFilterPolicy(b *testing.B, policies []filterconfig.FilterPolicy, batch *trace_v1.ResourceSpans) { + filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + + testRegistry := registry.NewTestRegistry() + cfg := Config{} + cfg.RegisterFlagsAndApplyDefaults("", nil) + + cfg.FilterPolicies = policies + p, err := New(cfg, testRegistry, filteredSpansCounter) + require.NoError(b, err) + defer p.Shutdown(context.Background()) + b.ResetTimer() + for n := 0; n < b.N; n++ { + p.PushSpans(context.Background(), &tempopb.PushSpansRequest{Batches: []*trace_v1.ResourceSpans{batch}}) + } +} diff --git a/modules/overrides/limits.go b/modules/overrides/limits.go index 98dd30122b5..4ee99ab8085 100644 --- a/modules/overrides/limits.go +++ b/modules/overrides/limits.go @@ -4,6 +4,7 @@ import ( "flag" "time" + filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) @@ -58,18 +59,19 @@ type Limits struct { Forwarders []string `yaml:"forwarders" json:"forwarders"` // Metrics-generator config - MetricsGeneratorRingSize int `yaml:"metrics_generator_ring_size" json:"metrics_generator_ring_size"` - MetricsGeneratorProcessors ListToMap `yaml:"metrics_generator_processors" json:"metrics_generator_processors"` - MetricsGeneratorMaxActiveSeries uint32 `yaml:"metrics_generator_max_active_series" json:"metrics_generator_max_active_series"` - MetricsGeneratorCollectionInterval time.Duration `yaml:"metrics_generator_collection_interval" json:"metrics_generator_collection_interval"` - MetricsGeneratorDisableCollection bool `yaml:"metrics_generator_disable_collection" json:"metrics_generator_disable_collection"` - MetricsGeneratorForwarderQueueSize int `yaml:"metrics_generator_forwarder_queue_size" json:"metrics_generator_forwarder_queue_size"` - MetricsGeneratorForwarderWorkers int `yaml:"metrics_generator_forwarder_workers" json:"metrics_generator_forwarder_workers"` - MetricsGeneratorProcessorServiceGraphsHistogramBuckets []float64 `yaml:"metrics_generator_processor_service_graphs_histogram_buckets" json:"metrics_generator_processor_service_graphs_histogram_buckets"` - MetricsGeneratorProcessorServiceGraphsDimensions []string `yaml:"metrics_generator_processor_service_graphs_dimensions" json:"metrics_generator_processor_service_graphs_dimensions"` - MetricsGeneratorProcessorSpanMetricsHistogramBuckets []float64 `yaml:"metrics_generator_processor_span_metrics_histogram_buckets" json:"metrics_generator_processor_span_metrics_histogram_buckets"` - MetricsGeneratorProcessorSpanMetricsDimensions []string `yaml:"metrics_generator_processor_span_metrics_dimensions" json:"metrics_generator_processor_span_metrics_dimensions"` - MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions map[string]bool `yaml:"metrics_generator_processor_span_metrics_intrinsic_dimensions" json:"metrics_generator_processor_span_metrics_intrinsic_dimensions"` + MetricsGeneratorRingSize int `yaml:"metrics_generator_ring_size" json:"metrics_generator_ring_size"` + MetricsGeneratorProcessors ListToMap `yaml:"metrics_generator_processors" json:"metrics_generator_processors"` + MetricsGeneratorMaxActiveSeries uint32 `yaml:"metrics_generator_max_active_series" json:"metrics_generator_max_active_series"` + MetricsGeneratorCollectionInterval time.Duration `yaml:"metrics_generator_collection_interval" json:"metrics_generator_collection_interval"` + MetricsGeneratorDisableCollection bool `yaml:"metrics_generator_disable_collection" json:"metrics_generator_disable_collection"` + MetricsGeneratorForwarderQueueSize int `yaml:"metrics_generator_forwarder_queue_size" json:"metrics_generator_forwarder_queue_size"` + MetricsGeneratorForwarderWorkers int `yaml:"metrics_generator_forwarder_workers" json:"metrics_generator_forwarder_workers"` + MetricsGeneratorProcessorServiceGraphsHistogramBuckets []float64 `yaml:"metrics_generator_processor_service_graphs_histogram_buckets" json:"metrics_generator_processor_service_graphs_histogram_buckets"` + MetricsGeneratorProcessorServiceGraphsDimensions []string `yaml:"metrics_generator_processor_service_graphs_dimensions" json:"metrics_generator_processor_service_graphs_dimensions"` + MetricsGeneratorProcessorSpanMetricsHistogramBuckets []float64 `yaml:"metrics_generator_processor_span_metrics_histogram_buckets" json:"metrics_generator_processor_span_metrics_histogram_buckets"` + MetricsGeneratorProcessorSpanMetricsDimensions []string `yaml:"metrics_generator_processor_span_metrics_dimensions" json:"metrics_generator_processor_span_metrics_dimensions"` + MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions map[string]bool `yaml:"metrics_generator_processor_span_metrics_intrinsic_dimensions" json:"metrics_generator_processor_span_metrics_intrinsic_dimensions"` + MetricsGeneratorProcessorSpanMetricsFilterPolicies []filterconfig.FilterPolicy `yaml:"metrics_generator_processor_span_metrics_filter_policies" json:"metrics_generator_processor_span_metrics_filter_policies"` // Compactor enforced limits. BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"` diff --git a/modules/overrides/overrides.go b/modules/overrides/overrides.go index 7ea17e441b8..442c2790400 100644 --- a/modules/overrides/overrides.go +++ b/modules/overrides/overrides.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "gopkg.in/yaml.v2" + filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/log" ) @@ -346,6 +347,11 @@ func (o *Overrides) MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions(user return o.getOverridesForUser(userID).MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions } +// MetricsGeneratorProcessorSpanMetricsFilterPolicies controls the filter policies that are added to the spanmetrics processor. +func (o *Overrides) MetricsGeneratorProcessorSpanMetricsFilterPolicies(userID string) []filterconfig.FilterPolicy { + return o.getOverridesForUser(userID).MetricsGeneratorProcessorSpanMetricsFilterPolicies +} + // BlockRetention is the duration of the block retention for this tenant. func (o *Overrides) BlockRetention(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).BlockRetention) diff --git a/pkg/spanfilter/config/config.go b/pkg/spanfilter/config/config.go new file mode 100644 index 00000000000..ada529b2291 --- /dev/null +++ b/pkg/spanfilter/config/config.go @@ -0,0 +1,84 @@ +package config + +import ( + "fmt" + + "github.com/grafana/tempo/pkg/traceql" + "github.com/pkg/errors" +) + +type FilterPolicy struct { + Include *PolicyMatch `yaml:"include"` + Exclude *PolicyMatch `yaml:"exclude"` +} + +type MatchType string + +const ( + Strict MatchType = "strict" + Regex MatchType = "regex" +) + +var ( + supportedIntrinsics = []traceql.Intrinsic{ + traceql.IntrinsicKind, + traceql.IntrinsicName, + traceql.IntrinsicStatus, + } +) + +type PolicyMatch struct { + MatchType MatchType `yaml:"match_type"` + Attributes []MatchPolicyAttribute `yaml:"attributes"` +} + +type MatchPolicyAttribute struct { + Key string `yaml:"key"` + Value interface{} `yaml:"value"` +} + +func ValidateFilterPolicy(policy FilterPolicy) error { + if policy.Include == nil && policy.Exclude == nil { + return fmt.Errorf("invalid filter policy; policies must have at least an `include` or `exclude`: %v", policy) + } + + if policy.Include != nil { + if err := ValidatePolicyMatch(policy.Include); err != nil { + return errors.Wrap(err, "invalid include policy") + } + } + + if policy.Exclude != nil { + if err := ValidatePolicyMatch(policy.Exclude); err != nil { + return errors.Wrap(err, "invalid exclude policy") + } + } + + return nil +} + +func ValidatePolicyMatch(match *PolicyMatch) error { + if match.MatchType != Strict && match.MatchType != Regex { + return fmt.Errorf("invalid match type: %v", match.MatchType) + } + + for _, attr := range match.Attributes { + if attr.Key == "" { + return fmt.Errorf("invalid attribute: %v", attr) + } + + a, err := traceql.ParseIdentifier(attr.Key) + if err != nil { + return err + } + if a.Scope == traceql.AttributeScopeNone { + switch a.Intrinsic { + case traceql.IntrinsicKind, traceql.IntrinsicName, traceql.IntrinsicStatus: // currently supported + default: + return fmt.Errorf("currently unsupported intrinsic: %s; supported intrinsics: %q", a.Intrinsic, supportedIntrinsics) + } + } + } + + return nil +} diff --git a/pkg/spanfilter/spanfilter.go b/pkg/spanfilter/spanfilter.go new file mode 100644 index 00000000000..30b53676a16 --- /dev/null +++ b/pkg/spanfilter/spanfilter.go @@ -0,0 +1,268 @@ +package spanfilter + +import ( + "reflect" + "regexp" + + "github.com/grafana/tempo/pkg/spanfilter/config" + v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" + v1 "github.com/grafana/tempo/pkg/tempopb/resource/v1" + v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/grafana/tempo/pkg/traceql" +) + +type SpanFilter struct { + filterPolicies []*filterPolicy +} + +type filterPolicy struct { + Include *splitPolicy + Exclude *splitPolicy +} + +// SplitPolicy is the result of parsing a policy from the config file to be +// specific about the area the given policy is applied to. +type splitPolicy struct { + ResourceMatch *config.PolicyMatch + SpanMatch *config.PolicyMatch + IntrinsicMatch *config.PolicyMatch +} + +func NewSpanFilter(filterPolicies []config.FilterPolicy) (*SpanFilter, error) { + var policies []*filterPolicy + + var err error + for _, policy := range filterPolicies { + err = config.ValidateFilterPolicy(policy) + if err != nil { + return nil, err + } + + p := &filterPolicy{ + Include: getSplitPolicy(policy.Include), + Exclude: getSplitPolicy(policy.Exclude), + } + + if p.Include != nil || p.Exclude != nil { + policies = append(policies, p) + } + } + + return &SpanFilter{ + filterPolicies: policies, + }, nil +} + +// applyFilterPolicy returns true if the span should be included in the metrics. +func (f *SpanFilter) ApplyFilterPolicy(rs *v1.Resource, span *v1_trace.Span) bool { + // With no filter policies specified, all spans are included. + if len(f.filterPolicies) == 0 { + return true + } + + for _, policy := range f.filterPolicies { + if policy.Include != nil { + if !policyMatch(policy.Include, rs, span) { + return false + } + } + + if policy.Exclude != nil { + if policyMatch(policy.Exclude, rs, span) { + return false + } + } + } + + return true +} + +func stringMatch(matchType config.MatchType, s, pattern string) bool { + switch matchType { + case config.Strict: + return s == pattern + case config.Regex: + re := regexp.MustCompile(pattern) + return re.MatchString(s) + default: + return false + } +} + +// policyMatch returns true when the resource attribtues and span attributes match the policy. +func policyMatch(policy *splitPolicy, rs *v1.Resource, span *v1_trace.Span) bool { + return policyMatchAttrs(policy.ResourceMatch, rs.Attributes) && + policyMatchAttrs(policy.SpanMatch, span.Attributes) && + policyMatchIntrinsicAttrs(policy.IntrinsicMatch, span) +} + +// policyMatchIntrinsicAttrs returns true when all intrinsic values in the policy match the span. +func policyMatchIntrinsicAttrs(policy *config.PolicyMatch, span *v1_trace.Span) bool { + matches := 0 + + var attr traceql.Attribute + var spanKind, policyKind v1_trace.Span_SpanKind + var spanStatusCode, policyStatusCode v1_trace.Status_StatusCode + + for _, pa := range policy.Attributes { + attr = traceql.MustParseIdentifier(pa.Key) + switch attr.Intrinsic { + // case traceql.IntrinsicDuration: + // case traceql.IntrinsicChildCount: + // case traceql.IntrinsicParent: + case traceql.IntrinsicName: + if !stringMatch(policy.MatchType, span.GetName(), pa.Value.(string)) { + return false + } + matches++ + case traceql.IntrinsicStatus: + switch pa.Value.(type) { + case v1_trace.Status_StatusCode: + spanStatusCode = span.GetStatus().GetCode() + policyStatusCode = pa.Value.(v1_trace.Status_StatusCode) + if policy.MatchType == config.Strict && spanStatusCode != policyStatusCode { + return false + } + default: + if !stringMatch(policy.MatchType, span.GetStatus().GetCode().String(), pa.Value.(string)) { + return false + } + } + matches++ + case traceql.IntrinsicKind: + switch pa.Value.(type) { + case v1_trace.Span_SpanKind: + spanKind = span.GetKind() + policyKind = pa.Value.(v1_trace.Span_SpanKind) + if policy.MatchType == config.Strict && spanKind != policyKind { + return false + } + default: + if !stringMatch(policy.MatchType, span.GetKind().String(), pa.Value.(string)) { + return false + } + + } + matches++ + } + } + + return len(policy.Attributes) == matches +} + +// policyMatchAttrs returns true if all attributes in the policy match the attributes in the span. String, bool, int, and floats are supported. Regex MatchType may be applied to string span attributes. +func policyMatchAttrs(policy *config.PolicyMatch, attrs []*v1_common.KeyValue) bool { + + matches := 0 + var v *v1_common.AnyValue + var pAttrValueType string + + for _, pa := range policy.Attributes { + pAttrValueType = reflect.TypeOf(pa.Value).String() + + for _, attr := range attrs { + if attr.GetKey() == pa.Key { + v = attr.GetValue() + + // For each type of value, check if the policy attribute value matches the span attribute value. + switch v.Value.(type) { + case *v1_common.AnyValue_StringValue: + if pAttrValueType != "string" { + return false + } + + if !stringMatch(policy.MatchType, v.GetStringValue(), pa.Value.(string)) { + return false + } + matches++ + case *v1_common.AnyValue_IntValue: + if pAttrValueType != "int" { + return false + } + + if v.GetIntValue() != int64(pa.Value.(int)) { + return false + } + matches++ + case *v1_common.AnyValue_DoubleValue: + if pAttrValueType != "float64" { + return false + } + + if v.GetDoubleValue() != pa.Value.(float64) { + return false + } + matches++ + case *v1_common.AnyValue_BoolValue: + if pAttrValueType != "bool" { + return false + } + + if v.GetBoolValue() != pa.Value.(bool) { + return false + } + matches++ + } + } + } + } + + return len(policy.Attributes) == matches +} + +func getSplitPolicy(policy *config.PolicyMatch) *splitPolicy { + if policy == nil { + return nil + } + + // A policy to match against the resource attributes + resourcePolicy := &config.PolicyMatch{ + MatchType: policy.MatchType, + Attributes: make([]config.MatchPolicyAttribute, 0), + } + + // A policy to match against the span attributes + spanPolicy := &config.PolicyMatch{ + MatchType: policy.MatchType, + Attributes: make([]config.MatchPolicyAttribute, 0), + } + + intrinsicPolicy := &config.PolicyMatch{ + MatchType: policy.MatchType, + Attributes: make([]config.MatchPolicyAttribute, 0), + } + + for _, pa := range policy.Attributes { + attr := traceql.MustParseIdentifier(pa.Key) + + attribute := config.MatchPolicyAttribute{ + Key: attr.Name, + Value: pa.Value, + } + + if attr.Intrinsic > 0 { + if policy.MatchType == config.Strict { + switch attr.Intrinsic { + case traceql.IntrinsicStatus: + attribute.Value = v1_trace.Status_StatusCode(v1_trace.Status_StatusCode_value[pa.Value.(string)]) + case traceql.IntrinsicKind: + attribute.Value = v1_trace.Span_SpanKind(v1_trace.Span_SpanKind_value[pa.Value.(string)]) + } + } + intrinsicPolicy.Attributes = append(intrinsicPolicy.Attributes, attribute) + } else { + switch attr.Scope { + case traceql.AttributeScopeSpan: + spanPolicy.Attributes = append(spanPolicy.Attributes, attribute) + case traceql.AttributeScopeResource: + resourcePolicy.Attributes = append(resourcePolicy.Attributes, attribute) + } + } + } + + return &splitPolicy{ + ResourceMatch: resourcePolicy, + SpanMatch: spanPolicy, + IntrinsicMatch: intrinsicPolicy, + } +} diff --git a/pkg/spanfilter/spanfilter_test.go b/pkg/spanfilter/spanfilter_test.go new file mode 100644 index 00000000000..fab3c1753d3 --- /dev/null +++ b/pkg/spanfilter/spanfilter_test.go @@ -0,0 +1,1260 @@ +package spanfilter + +import ( + "fmt" + "os" + "testing" + + "github.com/grafana/tempo/pkg/spanfilter/config" + "github.com/grafana/tempo/pkg/tempopb" + common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" + v1 "github.com/grafana/tempo/pkg/tempopb/resource/v1" + trace_v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/stretchr/testify/require" +) + +func TestSpanFilter_NewSpanFilter(t *testing.T) { + + cases := []struct { + name string + cfg []config.FilterPolicy + filter *SpanFilter + err error + }{ + { + name: "empty config", + cfg: []config.FilterPolicy{}, + filter: &SpanFilter{}, + err: nil, + }, + { + name: "nil config", + cfg: nil, + filter: &SpanFilter{}, + err: nil, + }, + { + name: "nil config", + cfg: nil, + filter: &SpanFilter{}, + err: nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := NewSpanFilter(tc.cfg) + require.NoError(t, err) + }) + } + +} + +func TestSpanFilter_policyMatch(t *testing.T) { + cases := []struct { + policy *config.PolicyMatch + resource *v1.Resource + span *trace_v1.Span + expect bool + testName string + }{ + { + testName: "most basic span kind matching", + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "span.kind", + Value: "SPAN_KIND_CLIENT", + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{}, + }, + span: &trace_v1.Span{ + Attributes: []*common_v1.KeyValue{ + { + Key: "kind", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "SPAN_KIND_CLIENT", + }, + }, + }, + }, + }, + }, + { + testName: "most basic intrinsic kind matching", + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_CLIENT", + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{}, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_CLIENT, + }, + }, + { + testName: "simple matching", + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_CLIENT", + }, + { + Key: "span.status.code", + Value: "STATUS_CODE_OK", + }, + { + Key: "resource.location", + Value: "earth", + }, + { + Key: "resource.name", + Value: "test", + }, + { + Key: "resource.othervalue", + Value: "somethinginteresting", + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{ + { + Key: "name", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "test", + }, + }, + }, + { + Key: "location", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "earth", + }, + }, + }, + { + Key: "othervalue", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "somethinginteresting", + }, + }, + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_CLIENT, + Attributes: []*common_v1.KeyValue{ + { + Key: "status.code", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "STATUS_CODE_OK", + }, + }, + }, + }, + }, + }, + { + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_CLIENT", + }, + { + Key: "status", + Value: "STATUS_CODE_OK", + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{}, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_CLIENT, + Status: &trace_v1.Status{Message: "OK", Code: trace_v1.Status_STATUS_CODE_OK}, + Attributes: []*common_v1.KeyValue{}, + }, + }, + { + testName: "resource matching", + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "resource.location", + Value: "earth", + }, + { + Key: "resource.othervalue", + Value: "somethinginteresting", + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{ + { + Key: "location", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "earth", + }, + }, + }, + { + Key: "othervalue", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "somethinginteresting", + }, + }, + }, + }, + }, + span: &trace_v1.Span{ + Attributes: []*common_v1.KeyValue{}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.testName, func(t *testing.T) { + r := policyMatch(getSplitPolicy(tc.policy), tc.resource, tc.span) + require.Equal(t, tc.expect, r) + }) + } +} + +func TestSpanFilter_policyMatchIntrinsicAttrs(t *testing.T) { + cases := []struct { + policy *config.PolicyMatch + span *trace_v1.Span + expect bool + name string + }{ + { + name: "match on name, kind and status", + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: trace_v1.Span_SPAN_KIND_SERVER, + }, + { + Key: "status", + Value: trace_v1.Status_STATUS_CODE_OK, + }, + { + Key: "name", + Value: "test", + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + { + name: "unmatched name", + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: trace_v1.Span_SPAN_KIND_SERVER, + }, + { + Key: "status", + Value: trace_v1.Status_STATUS_CODE_OK, + }, + { + Key: "name", + Value: "test", + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test2", + }, + }, + { + name: "unmatched status", + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: trace_v1.Span_SPAN_KIND_CLIENT, + }, + { + Key: "status", + Value: trace_v1.Status_STATUS_CODE_OK, + }, + { + Key: "name", + Value: "test", + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_CLIENT, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_ERROR, + }, + Name: "test", + }, + }, + { + name: "unmatched kind", + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: trace_v1.Span_SPAN_KIND_SERVER, + }, + { + Key: "status", + Value: trace_v1.Status_STATUS_CODE_OK, + }, + { + Key: "name", + Value: "test", + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_CLIENT, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + { + name: "matched regex kind and status", + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Regex, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: ".*_KIND_.*", + }, + { + Key: "status", + Value: ".*_CODE_.*", + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + { + name: "unmatched regex kind", + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Regex, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: ".*_CLIENT", + }, + { + Key: "status", + Value: ".*_OK", + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + { + name: "unmatched regex status", + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Regex, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: ".*_SERVER", + }, + { + Key: "status", + Value: ".*_ERROR", + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + r := policyMatchIntrinsicAttrs(tc.policy, tc.span) + require.Equal(t, tc.expect, r) + }) + } + +} + +func TestSpanFilter_policyMatchAttrs(t *testing.T) { + cases := []struct { + policy *config.PolicyMatch + attrs []*common_v1.KeyValue + expect bool + }{ + // Single string match + { + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "foo", + Value: "bar", + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "foo", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "bar", + }, + }, + }, + }, + }, + // Multiple string match + { + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "foo", + Value: "bar", + }, + { + Key: "otherfoo", + Value: "notbar", + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "foo", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "bar", + }, + }, + }, + { + Key: "otherfoo", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "notbar", + }, + }, + }, + }, + }, + // Multiple string non match + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "foo", + Value: "bar", + }, + { + Key: "otherfoo", + Value: "nope", + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "foo", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "bar", + }, + }, + }, + { + Key: "otherfoo", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "notbar", + }, + }, + }, + }, + }, + // Combination match + { + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "one", + Value: "1", + }, + { + Key: "oneone", + Value: 11, + }, + { + Key: "oneonepointone", + Value: 11.1, + }, + { + Key: "matching", + Value: true, + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "one", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "1", + }, + }, + }, + { + Key: "oneone", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_IntValue{ + IntValue: 11, + }, + }, + }, + { + Key: "oneonepointone", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_DoubleValue{ + DoubleValue: 11.1, + }, + }, + }, + { + Key: "matching", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_BoolValue{ + BoolValue: true, + }, + }, + }, + }, + }, + // Regex basic match + { + expect: true, + policy: &config.PolicyMatch{ + MatchType: config.Regex, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "dd", + Value: `\d\d\w{5}`, + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "dd", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "11xxxxx", + }, + }, + }, + }, + }, + // Value type mismatch string + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "dd", + Value: true, + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "dd", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "11xxxxx", + }, + }, + }, + }, + }, + // Value type mismatch string/int + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "dd", + Value: "value", + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "dd", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_IntValue{ + IntValue: 11, + }, + }, + }, + }, + }, + // Value type mismatch string/float + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "11", + Value: "eleven", + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "11", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_DoubleValue{ + DoubleValue: 11.1, + }, + }, + }, + }, + }, + // Value type mismatch string/bool + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "11", + Value: "eleven", + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "11", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_BoolValue{ + BoolValue: false, + }, + }, + }, + }, + }, + // Value type mismatch int/string + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "11", + Value: 11, + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "11", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "11", + }, + }, + }, + }, + }, + // Value mismatch int + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "11", + Value: 11, + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "11", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_IntValue{ + IntValue: 12, + }, + }, + }, + }, + }, + // Value mismatch bool + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "11", + Value: true, + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "11", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_BoolValue{ + BoolValue: false, + }, + }, + }, + }, + }, + // Value mismatch bool + { + expect: false, + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "11", + Value: 11.0, + }, + }, + }, + attrs: []*common_v1.KeyValue{ + { + Key: "11", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_DoubleValue{ + DoubleValue: 11.1, + }, + }, + }, + }, + }, + } + + for _, tc := range cases { + r := policyMatchAttrs(tc.policy, tc.attrs) + require.Equal(t, tc.expect, r) + } +} + +func TestSpanMetrics_applyFilterPolicy(t *testing.T) { + cases := []struct { + name string + err error + filterPolicies []config.FilterPolicy + expect bool + resource *v1.Resource + span *trace_v1.Span + }{ + { + name: "no policies matches", + err: nil, + expect: true, + filterPolicies: []config.FilterPolicy{}, + }, + { + name: "nil policies matches", + err: nil, + expect: true, + filterPolicies: nil, + }, + { + name: "non nil policy with nil include/exclude fails", + err: fmt.Errorf("invalid filter policy; policies must have at least an `include` or `exclude`: { }"), + expect: false, + filterPolicies: []config.FilterPolicy{{ + Include: nil, + Exclude: nil, + }}, + }, + { + name: "a matching policy", + err: nil, + expect: true, + filterPolicies: []config.FilterPolicy{ + { + Include: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_SERVER", + }, + { + Key: "resource.location", + Value: "earth", + }, + }, + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{ + { + Key: "name", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "test", + }, + }, + }, + { + Key: "location", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "earth", + }, + }, + }, + { + Key: "othervalue", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "somethinginteresting", + }, + }, + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + { + name: "a non-matching include policy", + err: nil, + expect: false, + filterPolicies: []config.FilterPolicy{ + { + Include: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_CLIENT", + }, + { + Key: "resource.location", + Value: "earth", + }, + }, + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{ + { + Key: "name", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "test", + }, + }, + }, + { + Key: "location", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "earth", + }, + }, + }, + { + Key: "othervalue", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "somethinginteresting", + }, + }, + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + { + name: "a matching include with rejecting exclude policy", + err: nil, + expect: false, + filterPolicies: []config.FilterPolicy{ + { + Include: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_SERVER", + }, + { + Key: "resource.location", + Value: "earth", + }, + }, + }, + Exclude: &config.PolicyMatch{ + MatchType: config.Regex, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "resource.othervalue", + Value: "something.*", + }, + }, + }, + }, + }, + resource: &v1.Resource{ + Attributes: []*common_v1.KeyValue{ + { + Key: "name", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "test", + }, + }, + }, + { + Key: "location", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "earth", + }, + }, + }, + { + Key: "othervalue", + Value: &common_v1.AnyValue{ + Value: &common_v1.AnyValue_StringValue{ + StringValue: "somethinginteresting", + }, + }, + }, + }, + }, + span: &trace_v1.Span{ + Kind: trace_v1.Span_SPAN_KIND_SERVER, + Status: &trace_v1.Status{ + Code: trace_v1.Status_STATUS_CODE_OK, + }, + Name: "test", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + sf, err := NewSpanFilter(tc.filterPolicies) + require.Equal(t, tc.err, err) + if err != nil { + return + } + x := sf.ApplyFilterPolicy(tc.resource, tc.span) + require.Equal(t, tc.expect, x) + }) + } + +} + +func TestSpanFilter_stringMatch(t *testing.T) { + cases := []struct { + matchType config.MatchType + s string + pattern string + expect bool + }{ + { + matchType: config.Strict, + s: "foo", + pattern: "foo", + expect: true, + }, + { + matchType: config.Strict, + s: "foo", + pattern: "bar", + expect: false, + }, + } + + for _, tc := range cases { + r := stringMatch(tc.matchType, tc.s, tc.pattern) + require.Equal(t, tc.expect, r) + } +} + +func TestSpanFilter_getSplitPolicy(t *testing.T) { + cases := []struct { + policy *config.PolicyMatch + split *splitPolicy + name string + }{ + { + name: "basic kind matching", + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: "SPAN_KIND_CLIENT", + }, + }, + }, + split: &splitPolicy{ + IntrinsicMatch: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "kind", + Value: trace_v1.Span_SPAN_KIND_CLIENT, + }, + }, + }, + }, + }, + { + name: "basic status matching", + policy: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "status", + Value: "STATUS_CODE_OK", + }, + }, + }, + split: &splitPolicy{ + IntrinsicMatch: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "status", + Value: trace_v1.Status_STATUS_CODE_OK, + }, + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := getSplitPolicy(tc.policy) + + require.NotNil(t, s) + require.NotNil(t, s.IntrinsicMatch) + require.NotNil(t, s.SpanMatch) + require.NotNil(t, s.ResourceMatch) + + if tc.split.IntrinsicMatch != nil { + require.Equal(t, tc.split.IntrinsicMatch, s.IntrinsicMatch) + } + if tc.split.SpanMatch != nil { + require.Equal(t, tc.split.SpanMatch, s.SpanMatch) + } + if tc.split.ResourceMatch != nil { + require.Equal(t, tc.split.ResourceMatch, s.ResourceMatch) + } + + }) + } +} + +func BenchmarkSpanFilter_applyFilterPolicyNone(b *testing.B) { + // Generate a batch of 100k spans + // r, done := test.NewRandomBatcher() + // defer done() + // batch := r.GenerateBatch(1e6) + // data, _ := batch.Marshal() + // _ = ioutil.WriteFile("testbatch100k", data, 0600) + + // Read the file generated above + data, err := os.ReadFile("testbatch100k") + require.NoError(b, err) + batch := &trace_v1.ResourceSpans{} + err = batch.Unmarshal(data) + require.NoError(b, err) + + // b.Logf("size: %s", humanize.Bytes(uint64(batch.Size()))) + // b.Logf("span count: %d", len(batch.ScopeSpans)) + + policies := []config.FilterPolicy{} + + benchmarkFilterPolicy(b, policies, batch) +} + +func BenchmarkSpanFilter_applyFilterPolicySmall(b *testing.B) { + // Read the file generated above + data, err := os.ReadFile("testbatch100k") + require.NoError(b, err) + batch := &trace_v1.ResourceSpans{} + err = batch.Unmarshal(data) + require.NoError(b, err) + + policies := []config.FilterPolicy{ + { + Include: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "span.foo", + Value: "foo-value", + }, + }, + }, + }, + } + + benchmarkFilterPolicy(b, policies, batch) +} + +func BenchmarkSpanFilter_applyFilterPolicyMedium(b *testing.B) { + // Read the file generated above + data, err := os.ReadFile("testbatch100k") + require.NoError(b, err) + batch := &trace_v1.ResourceSpans{} + err = batch.Unmarshal(data) + require.NoError(b, err) + + policies := []config.FilterPolicy{ + { + Include: &config.PolicyMatch{ + MatchType: config.Strict, + Attributes: []config.MatchPolicyAttribute{ + { + Key: "span.foo", + Value: "foo-value", + }, + { + Key: "span.x", + Value: "foo-value", + }, + { + Key: "span.y", + Value: "foo-value", + }, + { + Key: "span.z", + Value: "foo-value", + }, + }, + }, + }, + } + + benchmarkFilterPolicy(b, policies, batch) +} + +func benchmarkFilterPolicy(b *testing.B, policies []config.FilterPolicy, batch *trace_v1.ResourceSpans) { + filter, err := NewSpanFilter(policies) + require.NoError(b, err) + + b.ResetTimer() + for n := 0; n < b.N; n++ { + pushspans(&tempopb.PushSpansRequest{Batches: []*trace_v1.ResourceSpans{batch}}, filter) + } +} + +func pushspans(req *tempopb.PushSpansRequest, filter *SpanFilter) { + for _, rs := range req.Batches { + for _, ils := range rs.ScopeSpans { + for _, span := range ils.Spans { + filter.ApplyFilterPolicy(rs.Resource, span) + } + } + } +} diff --git a/pkg/util/test/random.go b/pkg/util/test/random.go new file mode 100644 index 00000000000..739276e7648 --- /dev/null +++ b/pkg/util/test/random.go @@ -0,0 +1,212 @@ +package test + +import ( + "context" + "math/rand" + "time" + + v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" + v1_resource "github.com/grafana/tempo/pkg/tempopb/resource/v1" + v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1" +) + +// RandomBatcher is a helper for generating random batches of spans. +type RandomBatcher struct { + stringReceiverChan chan string + attributeReceiverChan chan *v1_common.KeyValue + anyvalueReceiverChan chan *v1_common.AnyValue + spanReceiverChan chan *v1_trace.Span +} + +func NewRandomBatcher() (*RandomBatcher, context.CancelFunc) { + r := &RandomBatcher{ + stringReceiverChan: make(chan string, 100), + attributeReceiverChan: make(chan *v1_common.KeyValue, 100), + anyvalueReceiverChan: make(chan *v1_common.AnyValue, 100), + spanReceiverChan: make(chan *v1_trace.Span, 100), + } + + ctx, cancel := context.WithCancel(context.Background()) + + go r.randomStringGenerator(ctx, 1, 64) + go r.randomSpanAttributeGenerator(ctx) + go r.randomAnyValueGenerator(ctx) + go r.randomSpanGenerator(ctx) + + return r, cancel +} + +func (r *RandomBatcher) GenerateBatch(spanCount int64) *v1_trace.ResourceSpans { + + batch := &v1_trace.ResourceSpans{ + Resource: &v1_resource.Resource{ + Attributes: []*v1_common.KeyValue{ + { + Key: "service.name", + Value: &v1_common.AnyValue{ + Value: &v1_common.AnyValue_StringValue{ + StringValue: "test-service", + }, + }, + }, + }, + }, + } + + for i := int64(0); i < spanCount; i++ { + s := <-r.spanReceiverChan + batch.ScopeSpans = append(batch.ScopeSpans, &v1_trace.ScopeSpans{ + Scope: &v1_common.InstrumentationScope{ + Name: "super library", + Version: "0.0.1", + }, + Spans: []*v1_trace.Span{ + s, + }, + }) + } + + return batch +} + +func (r *RandomBatcher) randomSpanGenerator(ctx context.Context) { + min := 0 + max := 30 + + rising := true + length := min + + rr := rand.New(rand.NewSource(time.Now().UnixNano())) + + for { + select { + case <-ctx.Done(): + return + default: + + attributes := []*v1_common.KeyValue{} + + for i := 0; i < length; i++ { + attributes = append(attributes, <-r.attributeReceiverChan) + } + + now := time.Now() + + span := &v1_trace.Span{ + TraceId: []byte("12345678901234567890123456789012"), + SpanId: []byte("1234567890123456"), + ParentSpanId: []byte("1234567890123456"), + Name: <-r.stringReceiverChan, + StartTimeUnixNano: uint64(now.UnixNano()), + EndTimeUnixNano: uint64(now.Add(time.Duration(rr.Intn(1000000000)) * time.Nanosecond).UnixNano()), + Attributes: attributes, + } + r.spanReceiverChan <- span + + if rising { + length++ + } else { + length-- + } + + switch length { + case max: + rising = false + case min: + rising = true + } + + } + } +} + +func (r *RandomBatcher) randomAnyValueGenerator(ctx context.Context) { + rr := rand.New(rand.NewSource(time.Now().UnixNano())) + + var anyValue *v1_common.AnyValue + + for { + select { + case <-ctx.Done(): + return + default: + + switch rr.Intn(4) { + case 0: + anyValue = &v1_common.AnyValue{ + Value: &v1_common.AnyValue_StringValue{ + StringValue: <-r.stringReceiverChan, + }, + } + case 1: + anyValue = &v1_common.AnyValue{ + Value: &v1_common.AnyValue_BoolValue{ + BoolValue: bool(rr.Intn(2) == 1), + }, + } + case 2: + anyValue = &v1_common.AnyValue{ + Value: &v1_common.AnyValue_IntValue{ + IntValue: int64(rr.Intn(1000000000)), + }, + } + case 3: + anyValue = &v1_common.AnyValue{ + Value: &v1_common.AnyValue_DoubleValue{ + DoubleValue: rr.Float64(), + }, + } + } + + r.anyvalueReceiverChan <- anyValue + } + } +} + +func (r *RandomBatcher) randomSpanAttributeGenerator(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + attr := &v1_common.KeyValue{ + Key: <-r.stringReceiverChan, + Value: <-r.anyvalueReceiverChan, + } + r.attributeReceiverChan <- attr + } + } +} + +func (r *RandomBatcher) randomStringGenerator(ctx context.Context, min, max int) { + var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890") + + rising := true + length := min + for { + select { + case <-ctx.Done(): + return + default: + s := make([]rune, length) + for i := range s { + s[i] = letters[rand.Intn(len(letters))] + } + + r.stringReceiverChan <- string(s) + + if rising { + length++ + } else { + length-- + } + + switch length { + case max: + rising = false + case min: + rising = true + } + } + } +}