diff --git a/integration/e2e/config-limits.yaml b/integration/e2e/config-limits.yaml new file mode 100644 index 00000000000..7753cfc1dfb --- /dev/null +++ b/integration/e2e/config-limits.yaml @@ -0,0 +1,37 @@ +auth_enabled: false + +target: all + +server: + http_listen_port: 3100 + +distributor: + receivers: + jaeger: + protocols: + grpc: + +overrides: + max_spans_per_trace: 1 + max_traces_per_user: 1 + ingestion_rate_limit: 5 + ingestion_burst_size: 5 + +ingester: + lifecycler: + address: 127.0.0.1 + ring: + kvstore: + store: inmemory + replication_factor: 1 + final_sleep: 0s + trace_idle_period: 3600s + +storage: + trace: + backend: local + local: + path: /var/tempo + pool: + max_workers: 10 + queue_depth: 100 diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index 08c4764cb1e..39ee1bb0083 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -234,20 +234,29 @@ func TestMicroservices(t *testing.T) { } func makeThriftBatch() *thrift.Batch { + return makeThriftBatchWithSpanCount(1) +} + +func makeThriftBatchWithSpanCount(n int) *thrift.Batch { var spans []*thrift.Span - spans = append(spans, &thrift.Span{ - TraceIdLow: rand.Int63(), - TraceIdHigh: 0, - SpanId: rand.Int63(), - ParentSpanId: 0, - OperationName: "my operation", - References: nil, - Flags: 0, - StartTime: time.Now().Unix(), - Duration: 1, - Tags: nil, - Logs: nil, - }) + + traceIDLow := rand.Int63() + traceIDHigh := rand.Int63() + for i := 0; i < n; i++ { + spans = append(spans, &thrift.Span{ + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: rand.Int63(), + ParentSpanId: 0, + OperationName: "my operation", + References: nil, + Flags: 0, + StartTime: time.Now().Unix(), + Duration: 1, + Tags: nil, + Logs: nil, + }) + } return &thrift.Batch{Spans: spans} } diff --git a/integration/e2e/limits_test.go b/integration/e2e/limits_test.go new file mode 100644 index 00000000000..676920e1af1 --- /dev/null +++ b/integration/e2e/limits_test.go @@ -0,0 +1,58 @@ +package e2e + +import ( + "context" + "testing" + + util "github.com/grafana/tempo/integration" + "github.com/prometheus/prometheus/pkg/labels" + + cortex_e2e "github.com/cortexproject/cortex/integration/e2e" + "github.com/stretchr/testify/require" +) + +const ( + configLimits = "config-limits.yaml" +) + +func TestLimits(t *testing.T) { + s, err := cortex_e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + require.NoError(t, util.CopyFileToSharedDir(s, configLimits, "config.yaml")) + tempo := util.NewTempoAllInOne() + require.NoError(t, s.StartAndWaitReady(tempo)) + + // Get port for the otlp receiver endpoint + c, err := newJaegerGRPCClient(tempo.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c) + + // should fail b/c the trace is too large + batch := makeThriftBatchWithSpanCount(2) + require.Error(t, c.EmitBatch(context.Background(), batch)) + // should fail b/c this will be too many traces + batch = makeThriftBatch() + require.Error(t, c.EmitBatch(context.Background(), batch)) + // should fail b/c due to ingestion rate limit + batch = makeThriftBatchWithSpanCount(10) + require.Error(t, c.EmitBatch(context.Background(), batch)) + + // test limit metrics + err = tempo.WaitSumMetricsWithOptions(cortex_e2e.Equals(2), + []string{"tempo_discarded_spans_total"}, + cortex_e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "trace_too_large")), + ) + require.NoError(t, err) + err = tempo.WaitSumMetricsWithOptions(cortex_e2e.Equals(1), + []string{"tempo_discarded_spans_total"}, + cortex_e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "live_traces_exceeded")), + ) + require.NoError(t, err) + err = tempo.WaitSumMetricsWithOptions(cortex_e2e.Equals(10), + []string{"tempo_discarded_spans_total"}, + cortex_e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "rate_limited")), + ) + require.NoError(t, err) +} diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 3a79fd7d93f..79f7f5dc47a 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/cortexproject/cortex/pkg/ring" @@ -32,9 +33,12 @@ import ( const ( discardReasonLabel = "reason" - // RateLimited is one of the values for the reason to discard samples. - // Declared here to avoid duplication in ingester and distributor. - rateLimited = "rate_limited" + // reasonRateLimited indicates that the tenants spans/second exceeded their limits + reasonRateLimited = "rate_limited" + // reasonTraceTooLarge indicates that a single trace has too many spans + reasonTraceTooLarge = "trace_too_large" + // reasonLiveTracesExceeded indicates that tempo is already tracking too many live traces in the ingesters for this user + reasonLiveTracesExceeded = "live_traces_exceeded" ) var ( @@ -196,14 +200,15 @@ func (d *Distributor) stopping(_ error) error { func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb.PushResponse, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { + // can't record discarded spans here b/c there's no tenant return nil, err } - // calculate and metric size... + // metric size size := req.Size() metricBytesIngested.WithLabelValues(userID).Add(float64(size)) - // ... and spans + // metric spans if req.Batch == nil { return &tempopb.PushResponse{}, nil } @@ -219,11 +224,11 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp // check limits now := time.Now() if !d.ingestionRateLimiter.AllowN(now, userID, spanCount) { - // Return a 4xx here to have the client discard the data and not retry. If a client - // is sending too much data consistently we will unlikely ever catch up otherwise. - metricDiscardedSpans.WithLabelValues(rateLimited, userID).Add(float64(spanCount)) - - return nil, status.Errorf(codes.ResourceExhausted, "ingestion rate limit (%d spans) exceeded while adding %d spans", int(d.ingestionRateLimiter.Limit(now, userID)), spanCount) + metricDiscardedSpans.WithLabelValues(reasonRateLimited, userID).Add(float64(spanCount)) + return nil, status.Errorf(codes.ResourceExhausted, + "%s ingestion rate limit (%d spans) exceeded while adding %d spans", + overrides.ErrorPrefixRateLimited, + int(d.ingestionRateLimiter.Limit(now, userID)), spanCount) } keys, traces, err := requestsByTraceID(req, userID, spanCount) @@ -232,6 +237,9 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp } err = d.sendToIngestersViaBytes(ctx, userID, traces, keys) + if err != nil { + recordDiscaredSpans(err, userID, spanCount) + } return nil, err // PushRequest is ignored, so no reason to create one } @@ -347,3 +355,17 @@ func requestsByTraceID(req *tempopb.PushRequest, userID string, spanCount int) ( return keys, pushRequests, nil } + +func recordDiscaredSpans(err error, userID string, spanCount int) { + s := status.Convert(err) + if s == nil { + return + } + desc := s.Message() + + if strings.HasPrefix(desc, overrides.ErrorPrefixLiveTracesExceeded) { + metricDiscardedSpans.WithLabelValues(reasonLiveTracesExceeded, userID).Add(float64(spanCount)) + } else if strings.HasPrefix(desc, overrides.ErrorPrefixTraceTooLarge) { + metricDiscardedSpans.WithLabelValues(reasonTraceTooLarge, userID).Add(float64(spanCount)) + } +} diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 2a271e631c4..99fa2306719 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc/codes" + "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb" @@ -287,7 +288,7 @@ func (i *instance) getOrCreateTrace(req *tempopb.PushRequest) (*trace, error) { err = i.limiter.AssertMaxTracesPerUser(i.instanceID, len(i.traces)) if err != nil { - return nil, status.Errorf(codes.FailedPrecondition, "max live traces per tenant exceeded: %v", err) + return nil, status.Errorf(codes.FailedPrecondition, "%s max live traces per tenant exceeded: %v", overrides.ErrorPrefixLiveTracesExceeded, err) } maxSpans := i.limiter.limits.MaxSpansPerTrace(i.instanceID) diff --git a/modules/ingester/trace.go b/modules/ingester/trace.go index 9397a5565d0..328b77d7bba 100644 --- a/modules/ingester/trace.go +++ b/modules/ingester/trace.go @@ -5,6 +5,7 @@ import ( "time" "github.com/gogo/status" + "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/tempopb" "google.golang.org/grpc/codes" ) @@ -37,7 +38,7 @@ func (t *trace) Push(_ context.Context, req *tempopb.PushRequest) error { } if t.currentSpans+spanCount > t.maxSpans { - return status.Errorf(codes.FailedPrecondition, "totalSpans (%d) exceeded while adding %d spans", t.maxSpans, spanCount) + return status.Errorf(codes.FailedPrecondition, "%s totalSpans (%d) exceeded while adding %d spans", overrides.ErrorPrefixTraceTooLarge, t.maxSpans, spanCount) } t.currentSpans += spanCount diff --git a/modules/overrides/limits.go b/modules/overrides/limits.go index 44fe212f532..a150bc3a13f 100644 --- a/modules/overrides/limits.go +++ b/modules/overrides/limits.go @@ -6,11 +6,17 @@ import ( ) const ( - // Local ingestion rate strategy + // LocalIngestionRateStrategy indicates that this limit can be evaluated in local terms only LocalIngestionRateStrategy = "local" - - // Global ingestion rate strategy + // GlobalIngestionRateStrategy indicates that an attempt should be made to consider this limit across the entire Tempo cluster GlobalIngestionRateStrategy = "global" + + // ErrorPrefixLiveTracesExceeded is used to flag batches from the ingester that were rejected b/c they had too many traces + ErrorPrefixLiveTracesExceeded = "LIVE_TRACES_EXCEEDED:" + // ErrorPrefixTraceTooLarge is used to flag batches from the ingester that were rejected b/c they exceeded the single trace limit + ErrorPrefixTraceTooLarge = "TRACE_TOO_LARGE:" + // ErrorPrefixRateLimited is used to flag batches that have exceeded the spans/second of the tenant + ErrorPrefixRateLimited = "RATE_LIMITED:" ) // Limits describe all the limits for users; can be used to describe global default