Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propogate Baggage in OTEL tracing interceptor #1260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions contrib/opentelemetry/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ package opentelemetry
import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -56,6 +56,9 @@ type TracerOptions struct {
// DisableQueryTracing can be set to disable query tracing.
DisableQueryTracing bool

// DisableBaggage can be set to disable baggage propagation.
DisableBaggage bool

// AllowInvalidParentSpans will swallow errors interpreting parent
// spans from headers. Useful when migrating from one tracing library
// to another, while workflows/activities may be in progress.
Expand Down Expand Up @@ -140,46 +143,69 @@ func (t *tracer) Options() interceptor.TracerOptions {
}

func (t *tracer) UnmarshalSpan(m map[string]string) (interceptor.TracerSpanRef, error) {
ctx := trace.SpanContextFromContext(t.options.TextMapPropagator.Extract(context.Background(), textMapCarrier(m)))
if !ctx.IsValid() {
ctx := t.options.TextMapPropagator.Extract(context.Background(), textMapCarrier(m))
spanCtx := trace.SpanContextFromContext(ctx)
if !spanCtx.IsValid() {
return nil, fmt.Errorf("failed extracting OpenTelemetry span from map")
}
return &tracerSpanRef{SpanContext: ctx}, nil
spanRef := &tracerSpanRef{SpanContext: spanCtx}
if !t.options.DisableBaggage {
spanRef.Baggage = baggage.FromContext(ctx)
}
return spanRef, nil
}

func (t *tracer) MarshalSpan(span interceptor.TracerSpan) (map[string]string, error) {
data := textMapCarrier{}
t.options.TextMapPropagator.Inject(trace.ContextWithSpan(context.Background(), span.(*tracerSpan).Span), data)
return map[string]string(data), nil
tSpan := span.(*tracerSpan)
ctx := context.Background()
if !t.options.DisableBaggage {
ctx = baggage.ContextWithBaggage(ctx, tSpan.Baggage)
}
t.options.TextMapPropagator.Inject(trace.ContextWithSpan(ctx, tSpan.Span), data)
return data, nil
}

func (t *tracer) SpanFromContext(ctx context.Context) interceptor.TracerSpan {
span := trace.SpanFromContext(ctx)
if !span.SpanContext().IsValid() {
return nil
}
return &tracerSpan{Span: span}
tSpan := &tracerSpan{Span: span}
if !t.options.DisableBaggage {
tSpan.Baggage = baggage.FromContext(ctx)
}
return tSpan
}

func (t *tracer) ContextWithSpan(ctx context.Context, span interceptor.TracerSpan) context.Context {
if !t.options.DisableBaggage {
ctx = baggage.ContextWithBaggage(ctx, span.(*tracerSpan).Baggage)
}
return trace.ContextWithSpan(ctx, span.(*tracerSpan).Span)
}

func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (interceptor.TracerSpan, error) {
// Create context with parent
var parent trace.SpanContext
var bag baggage.Baggage
switch optParent := opts.Parent.(type) {
case nil:
case *tracerSpan:
parent = optParent.SpanContext()
bag = optParent.Baggage
case *tracerSpanRef:
parent = optParent.SpanContext
bag = optParent.Baggage
default:
return nil, fmt.Errorf("unrecognized parent type %T", optParent)
}
ctx := context.Background()
if parent.IsValid() {
ctx = trace.ContextWithSpanContext(ctx, parent)
if !t.options.DisableBaggage {
ctx = baggage.ContextWithBaggage(ctx, bag)
}
}

// Create span
Expand All @@ -194,7 +220,12 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto
span.SetAttributes(attrs...)
}

return &tracerSpan{Span: span}, nil
tSpan := &tracerSpan{Span: span}
if !t.options.DisableBaggage {
tSpan.Baggage = bag
}

return tSpan, nil
}

func (t *tracer) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger {
Expand All @@ -211,9 +242,15 @@ func (t *tracer) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log
return logger
}

type tracerSpanRef struct{ trace.SpanContext }
type tracerSpanRef struct {
trace.SpanContext
baggage.Baggage
}

type tracerSpan struct{ trace.Span }
type tracerSpan struct {
trace.Span
baggage.Baggage
}

func (t *tracerSpan) Finish(opts *interceptor.TracerFinishSpanOptions) {
if opts.Error != nil {
Expand Down
5 changes: 5 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/baggage"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -378,6 +379,10 @@ func (a *Activities) ExternalSignalsAndQueries(ctx context.Context) error {
return run.Get(ctx, nil)
}

func (a *Activities) CheckBaggage(ctx context.Context, key string) (string, error) {
return baggage.FromContext(ctx).Member(key).Value(), nil
}

func (*Activities) TooFewParams(
ctx context.Context,
param1 string,
Expand Down
41 changes: 40 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/baggage"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -156,14 +157,16 @@ func (ts *IntegrationTestSuite) SetupTest() {
}

// Record spans for tracing test
if strings.HasPrefix(ts.T().Name(), "TestIntegrationSuite/TestOpenTelemetryTracing") {
if strings.HasPrefix(ts.T().Name(), "TestIntegrationSuite/TestOpenTelemetryTracing") ||
strings.HasPrefix(ts.T().Name(), "TestIntegrationSuite/TestOpenTelemetryBaggageHandling") {
ts.openTelemetrySpanRecorder = tracetest.NewSpanRecorder()
ts.openTelemetryTracer = sdktrace.NewTracerProvider(
sdktrace.WithSpanProcessor(ts.openTelemetrySpanRecorder)).Tracer("")
interceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{
Tracer: ts.openTelemetryTracer,
DisableSignalTracing: strings.HasSuffix(ts.T().Name(), "WithoutSignalsAndQueries"),
DisableQueryTracing: strings.HasSuffix(ts.T().Name(), "WithoutSignalsAndQueries"),
DisableBaggage: strings.HasSuffix(ts.T().Name(), "WithDisableBaggageOption"),
})
ts.NoError(err)
clientInterceptors = append(clientInterceptors, interceptor)
Expand Down Expand Up @@ -1988,6 +1991,42 @@ func (ts *IntegrationTestSuite) addOpenTelemetryChildren(
}
}

func (ts *IntegrationTestSuite) TestOpenTelemetryBaggageHandling() {
ts.testOpenTelemetryBaggageHandling(false)
}

func (ts *IntegrationTestSuite) TestOpenTelemetryBaggageHandlingWithDisableBaggageOption() {
ts.testOpenTelemetryBaggageHandling(true)
}

func (ts *IntegrationTestSuite) testOpenTelemetryBaggageHandling(disableBaggage bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start a top-level span
ctx, rootSpan := ts.openTelemetryTracer.Start(ctx, "root-span")
defer rootSpan.End()

// Add baggage to context
var expectedBaggage string
if disableBaggage {
expectedBaggage = ""
} else {
expectedBaggage = "baggage-value"
}
bag := baggage.FromContext(ctx)
member, _ := baggage.NewMember("baggage-key", "baggage-value")
bag, _ = bag.SetMember(member)
ctx = baggage.ContextWithBaggage(ctx, bag)

// Start workflow
var actualBaggage string
opts := ts.startWorkflowOptions("test-interceptor-open-telemetry-baggage")
err := ts.executeWorkflowWithContextAndOption(ctx, opts, ts.workflows.CheckOpenTelemetryBaggage, &actualBaggage, "baggage-key")
ts.NoError(err)

ts.Equal(expectedBaggage, actualBaggage)
}

func (ts *IntegrationTestSuite) TestOpenTracingNoopTracer() {
// The setup of the test already puts a noop tracer on the client. In past
// versions, this would break due to tracer.Extract returning an error every
Expand Down
11 changes: 11 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,16 @@ func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActiv
return nil
}

func (w *Workflows) CheckOpenTelemetryBaggage(ctx workflow.Context, key string) (string, error) {
var baggage string
var a Activities
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
if err := workflow.ExecuteActivity(ctx, a.CheckBaggage, key).Get(ctx, &baggage); err != nil {
return "", fmt.Errorf("failed checking baggage: %w", err)
}
return baggage, nil
}

type AdvancedPostCancellationInput struct {
PreCancelActivity bool
PostCancelActivity bool
Expand Down Expand Up @@ -2434,6 +2444,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.InterceptorCalls)
worker.RegisterWorkflow(w.WaitSignalToStart)
worker.RegisterWorkflow(w.SignalsAndQueries)
worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage)
worker.RegisterWorkflow(w.AdvancedPostCancellation)
worker.RegisterWorkflow(w.AdvancedPostCancellationChildWithDone)
worker.RegisterWorkflow(w.TooFewParams)
Expand Down
Loading