From 6bd711dcad395ba2ba6c109ed1d075a355fa8808 Mon Sep 17 00:00:00 2001 From: jvonfricken Date: Tue, 10 Oct 2023 21:22:32 -0400 Subject: [PATCH 1/3] Propogate Baggage in OTEL tracing interceptor --- contrib/opentelemetry/tracing_interceptor.go | 34 ++++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/contrib/opentelemetry/tracing_interceptor.go b/contrib/opentelemetry/tracing_interceptor.go index 4d35c376d..b2b3d97ce 100644 --- a/contrib/opentelemetry/tracing_interceptor.go +++ b/contrib/opentelemetry/tracing_interceptor.go @@ -26,9 +26,9 @@ package opentelemetry import ( "context" "fmt" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" @@ -140,17 +140,20 @@ func (t *tracer) Options() interceptor.TracerOptions { } func (t *tracer) UnmarshalSpan(m map[string]string) (interceptor.TracerSpanRef, error) { - ctx := trace.SpanContextFromContext(t.options.TextMapPropagator.Extract(context.Background(), textMapCarrier(m))) - if !ctx.IsValid() { + ctx := t.options.TextMapPropagator.Extract(context.Background(), textMapCarrier(m)) + spanCtx := trace.SpanContextFromContext(ctx) + if !spanCtx.IsValid() { return nil, fmt.Errorf("failed extracting OpenTelemetry span from map") } - return &tracerSpanRef{SpanContext: ctx}, nil + return &tracerSpanRef{SpanContext: spanCtx, Baggage: baggage.FromContext(ctx)}, nil } func (t *tracer) MarshalSpan(span interceptor.TracerSpan) (map[string]string, error) { data := textMapCarrier{} - t.options.TextMapPropagator.Inject(trace.ContextWithSpan(context.Background(), span.(*tracerSpan).Span), data) - return map[string]string(data), nil + tSpan := span.(*tracerSpan) + ctx := baggage.ContextWithBaggage(context.Background(), tSpan.Baggage) + t.options.TextMapPropagator.Inject(trace.ContextWithSpan(ctx, tSpan.Span), data) + return data, nil } func (t *tracer) SpanFromContext(ctx context.Context) interceptor.TracerSpan { @@ -158,28 +161,33 @@ func (t *tracer) SpanFromContext(ctx context.Context) interceptor.TracerSpan { if !span.SpanContext().IsValid() { return nil } - return &tracerSpan{Span: span} + return &tracerSpan{Span: span, Baggage: baggage.FromContext(ctx)} } func (t *tracer) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context { + ctx = baggage.ContextWithBaggage(ctx, span.(*tracerSpan).Baggage) return trace.ContextWithSpan(ctx, span.(*tracerSpan).Span) } func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (interceptor.TracerSpan, error) { // Create context with parent var parent trace.SpanContext + var bag baggage.Baggage switch optParent := opts.Parent.(type) { case nil: case *tracerSpan: parent = optParent.SpanContext() + bag = optParent.Baggage case *tracerSpanRef: parent = optParent.SpanContext + bag = optParent.Baggage default: return nil, fmt.Errorf("unrecognized parent type %T", optParent) } ctx := context.Background() if parent.IsValid() { ctx = trace.ContextWithSpanContext(ctx, parent) + ctx = baggage.ContextWithBaggage(ctx, bag) } // Create span @@ -194,7 +202,7 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto span.SetAttributes(attrs...) } - return &tracerSpan{Span: span}, nil + return &tracerSpan{Span: span, Baggage: bag}, nil } func (t *tracer) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger { @@ -211,9 +219,15 @@ func (t *tracer) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log return logger } -type tracerSpanRef struct{ trace.SpanContext } +type tracerSpanRef struct { + trace.SpanContext + baggage.Baggage +} -type tracerSpan struct{ trace.Span } +type tracerSpan struct { + trace.Span + baggage.Baggage +} func (t *tracerSpan) Finish(opts *interceptor.TracerFinishSpanOptions) { if opts.Error != nil { From dfc9f9618f313f6c8585a504747744a3947f7368 Mon Sep 17 00:00:00 2001 From: jvonfricken Date: Wed, 11 Oct 2023 17:44:44 -0400 Subject: [PATCH 2/3] Add integration test for OTEL baggage propagation --- test/activity_test.go | 5 +++++ test/integration_test.go | 27 ++++++++++++++++++++++++++- test/workflow_test.go | 11 +++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/test/activity_test.go b/test/activity_test.go index eba9ef25b..ca1d89017 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "go.opentelemetry.io/otel/baggage" "strconv" "strings" "sync" @@ -378,6 +379,10 @@ func (a *Activities) ExternalSignalsAndQueries(ctx context.Context) error { return run.Get(ctx, nil) } +func (a *Activities) CheckBaggage(ctx context.Context, key string) (string, error) { + return baggage.FromContext(ctx).Member(key).Value(), nil +} + func (*Activities) TooFewParams( ctx context.Context, param1 string, diff --git a/test/integration_test.go b/test/integration_test.go index 4e33dfa45..3bb6cb684 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "go.opentelemetry.io/otel/baggage" "os" "strings" "sync" @@ -156,7 +157,8 @@ func (ts *IntegrationTestSuite) SetupTest() { } // Record spans for tracing test - if strings.HasPrefix(ts.T().Name(), "TestIntegrationSuite/TestOpenTelemetryTracing") { + if strings.HasPrefix(ts.T().Name(), "TestIntegrationSuite/TestOpenTelemetryTracing") || + strings.HasPrefix(ts.T().Name(), "TestIntegrationSuite/TestOpenTelemetryBaggageHandling") { ts.openTelemetrySpanRecorder = tracetest.NewSpanRecorder() ts.openTelemetryTracer = sdktrace.NewTracerProvider( sdktrace.WithSpanProcessor(ts.openTelemetrySpanRecorder)).Tracer("") @@ -1988,6 +1990,29 @@ func (ts *IntegrationTestSuite) addOpenTelemetryChildren( } } +func (ts *IntegrationTestSuite) TestOpenTelemetryBaggageHandling() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Start a top-level span + ctx, rootSpan := ts.openTelemetryTracer.Start(ctx, "root-span") + defer rootSpan.End() + + // Add baggage to context + expectedBaggage := "baggage-value" + bag := baggage.FromContext(ctx) + member, _ := baggage.NewMember("baggage-key", expectedBaggage) + bag, _ = bag.SetMember(member) + ctx = baggage.ContextWithBaggage(ctx, bag) + + // Start workflow + var actualBaggage string + opts := ts.startWorkflowOptions("test-interceptor-open-telemetry-baggage") + err := ts.executeWorkflowWithContextAndOption(ctx, opts, ts.workflows.CheckOpenTelemetryBaggage, &actualBaggage, "baggage-key") + ts.NoError(err) + + ts.Equal(expectedBaggage, actualBaggage) +} + func (ts *IntegrationTestSuite) TestOpenTracingNoopTracer() { // The setup of the test already puts a noop tracer on the client. In past // versions, this would break due to tracer.Extract returning an error every diff --git a/test/workflow_test.go b/test/workflow_test.go index b0889212c..2056940a1 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1920,6 +1920,16 @@ func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActiv return nil } +func (w *Workflows) CheckOpenTelemetryBaggage(ctx workflow.Context, key string) (string, error) { + var baggage string + var a Activities + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + if err := workflow.ExecuteActivity(ctx, a.CheckBaggage, key).Get(ctx, &baggage); err != nil { + return "", fmt.Errorf("failed checking baggage: %w", err) + } + return baggage, nil +} + type AdvancedPostCancellationInput struct { PreCancelActivity bool PostCancelActivity bool @@ -2434,6 +2444,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.InterceptorCalls) worker.RegisterWorkflow(w.WaitSignalToStart) worker.RegisterWorkflow(w.SignalsAndQueries) + worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage) worker.RegisterWorkflow(w.AdvancedPostCancellation) worker.RegisterWorkflow(w.AdvancedPostCancellationChildWithDone) worker.RegisterWorkflow(w.TooFewParams) From 4ff3ab1a55a7b69d9d793c8f4798eed8e227528b Mon Sep 17 00:00:00 2001 From: jvonfricken Date: Thu, 12 Oct 2023 10:51:04 -0400 Subject: [PATCH 3/3] Added DisableBaggage to OTEL TracerOptions --- contrib/opentelemetry/tracing_interceptor.go | 35 ++++++++++++++++---- test/integration_test.go | 18 ++++++++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/contrib/opentelemetry/tracing_interceptor.go b/contrib/opentelemetry/tracing_interceptor.go index b2b3d97ce..bebd701fc 100644 --- a/contrib/opentelemetry/tracing_interceptor.go +++ b/contrib/opentelemetry/tracing_interceptor.go @@ -56,6 +56,9 @@ type TracerOptions struct { // DisableQueryTracing can be set to disable query tracing. DisableQueryTracing bool + // DisableBaggage can be set to disable baggage propagation. + DisableBaggage bool + // AllowInvalidParentSpans will swallow errors interpreting parent // spans from headers. Useful when migrating from one tracing library // to another, while workflows/activities may be in progress. @@ -145,13 +148,20 @@ func (t *tracer) UnmarshalSpan(m map[string]string) (interceptor.TracerSpanRef, if !spanCtx.IsValid() { return nil, fmt.Errorf("failed extracting OpenTelemetry span from map") } - return &tracerSpanRef{SpanContext: spanCtx, Baggage: baggage.FromContext(ctx)}, nil + spanRef := &tracerSpanRef{SpanContext: spanCtx} + if !t.options.DisableBaggage { + spanRef.Baggage = baggage.FromContext(ctx) + } + return spanRef, nil } func (t *tracer) MarshalSpan(span interceptor.TracerSpan) (map[string]string, error) { data := textMapCarrier{} tSpan := span.(*tracerSpan) - ctx := baggage.ContextWithBaggage(context.Background(), tSpan.Baggage) + ctx := context.Background() + if !t.options.DisableBaggage { + ctx = baggage.ContextWithBaggage(ctx, tSpan.Baggage) + } t.options.TextMapPropagator.Inject(trace.ContextWithSpan(ctx, tSpan.Span), data) return data, nil } @@ -161,11 +171,17 @@ func (t *tracer) SpanFromContext(ctx context.Context) interceptor.TracerSpan { if !span.SpanContext().IsValid() { return nil } - return &tracerSpan{Span: span, Baggage: baggage.FromContext(ctx)} + tSpan := &tracerSpan{Span: span} + if !t.options.DisableBaggage { + tSpan.Baggage = baggage.FromContext(ctx) + } + return tSpan } func (t *tracer) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context { - ctx = baggage.ContextWithBaggage(ctx, span.(*tracerSpan).Baggage) + if !t.options.DisableBaggage { + ctx = baggage.ContextWithBaggage(ctx, span.(*tracerSpan).Baggage) + } return trace.ContextWithSpan(ctx, span.(*tracerSpan).Span) } @@ -187,7 +203,9 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto ctx := context.Background() if parent.IsValid() { ctx = trace.ContextWithSpanContext(ctx, parent) - ctx = baggage.ContextWithBaggage(ctx, bag) + if !t.options.DisableBaggage { + ctx = baggage.ContextWithBaggage(ctx, bag) + } } // Create span @@ -202,7 +220,12 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto span.SetAttributes(attrs...) } - return &tracerSpan{Span: span, Baggage: bag}, nil + tSpan := &tracerSpan{Span: span} + if !t.options.DisableBaggage { + tSpan.Baggage = bag + } + + return tSpan, nil } func (t *tracer) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger { diff --git a/test/integration_test.go b/test/integration_test.go index 3bb6cb684..b30c9e677 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -166,6 +166,7 @@ func (ts *IntegrationTestSuite) SetupTest() { Tracer: ts.openTelemetryTracer, DisableSignalTracing: strings.HasSuffix(ts.T().Name(), "WithoutSignalsAndQueries"), DisableQueryTracing: strings.HasSuffix(ts.T().Name(), "WithoutSignalsAndQueries"), + DisableBaggage: strings.HasSuffix(ts.T().Name(), "WithDisableBaggageOption"), }) ts.NoError(err) clientInterceptors = append(clientInterceptors, interceptor) @@ -1991,6 +1992,14 @@ func (ts *IntegrationTestSuite) addOpenTelemetryChildren( } func (ts *IntegrationTestSuite) TestOpenTelemetryBaggageHandling() { + ts.testOpenTelemetryBaggageHandling(false) +} + +func (ts *IntegrationTestSuite) TestOpenTelemetryBaggageHandlingWithDisableBaggageOption() { + ts.testOpenTelemetryBaggageHandling(true) +} + +func (ts *IntegrationTestSuite) testOpenTelemetryBaggageHandling(disableBaggage bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start a top-level span @@ -1998,9 +2007,14 @@ func (ts *IntegrationTestSuite) TestOpenTelemetryBaggageHandling() { defer rootSpan.End() // Add baggage to context - expectedBaggage := "baggage-value" + var expectedBaggage string + if disableBaggage { + expectedBaggage = "" + } else { + expectedBaggage = "baggage-value" + } bag := baggage.FromContext(ctx) - member, _ := baggage.NewMember("baggage-key", expectedBaggage) + member, _ := baggage.NewMember("baggage-key", "baggage-value") bag, _ = bag.SetMember(member) ctx = baggage.ContextWithBaggage(ctx, bag)