Skip to content

Commit

Permalink
Separate incoming and outgoing carrier
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Nov 20, 2024
1 parent 4f77441 commit d708a8b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 94 deletions.
110 changes: 65 additions & 45 deletions stats/opentelemetry/internal/tracing/carrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,23 @@ package tracing
import (
"context"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
)

// propagationDirection specifies whether the propagation is incoming or
// outgoing.
type propagationDirection int
var logger = grpclog.Component("otel-plugin")

const (
incoming propagationDirection = iota // incoming propagation direction
outgoing // outgoing propagation direction
)

// Carrier is a TextMapCarrier that uses `context.Context` to store and
// retrieve any propagated key-value pairs in text format. The propagation
// direction (incoming or outgoing) determines whether the `Keys()` method
// should return keys from the incoming or outgoing context metadata.
type Carrier struct {
ctx context.Context
direction propagationDirection
}

// NewIncomingCarrier creates a new Carrier with the given context and incoming
// propagation direction. The incoming carrier should be used with propagator's
// `Extract()` method.
func NewIncomingCarrier(ctx context.Context) *Carrier {
return &Carrier{ctx: ctx, direction: incoming}
// IncomingCarrier is a TextMapCarrier that uses incoming `context.Context` to
// retrieve any propagated key-value pairs in text format.
type IncomingCarrier struct {
ctx context.Context
}

// NewOutgoingCarrier creates a new Carrier with the given context and outgoing
// propagation direction. The outgoing carrier should be used with propagator's
// `Inject()` method.
func NewOutgoingCarrier(ctx context.Context) *Carrier {
return &Carrier{ctx: ctx, direction: outgoing}
// NewIncomingCarrier creates a new `IncomingCarrier` with the given context.
// The incoming carrier should be used with propagator's `Extract()` method in
// the incoming rpc path.
func NewIncomingCarrier(ctx context.Context) *IncomingCarrier {
return &IncomingCarrier{ctx: ctx}
}

// Get returns the string value associated with the passed key from the
Expand All @@ -65,36 +49,71 @@ func NewOutgoingCarrier(ctx context.Context) *Carrier {
// context or if the value associated with the key is empty.
//
// If multiple values are present for a key, it returns the last one.
func (c *Carrier) Get(key string) string {
func (c *IncomingCarrier) Get(key string) string {
values := metadata.ValueFromIncomingContext(c.ctx, key)
if len(values) == 0 {
return ""
}
return values[len(values)-1]
}

// Set just logs an error. It implements the `TextMapCarrier` interface but
// should not be used with `IncomingCarrier`.
func (c *IncomingCarrier) Set(string, string) {
logger.Error("Set() should not be used with IncomingCarrier.")

Check warning on line 63 in stats/opentelemetry/internal/tracing/carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/carrier.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

// Keys returns the keys stored in the carrier's context metadata. It returns
// keys from incoming context metadata.
func (c *IncomingCarrier) Keys() []string {
md, ok := metadata.FromIncomingContext(c.ctx)
if !ok {
return nil
}

Check warning on line 72 in stats/opentelemetry/internal/tracing/carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/carrier.go#L71-L72

Added lines #L71 - L72 were not covered by tests
keys := make([]string, 0, len(md))
for key := range md {
keys = append(keys, key)
}
return keys
}

// Context returns the underlying context associated with the
// `IncomingCarrier“.
func (c *IncomingCarrier) Context() context.Context {
return c.ctx

Check warning on line 83 in stats/opentelemetry/internal/tracing/carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/carrier.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}

// OutgoingCarrier is a TextMapCarrier that uses outgoing `context.Context` to
// store any propagated key-value pairs in text format.
type OutgoingCarrier struct {
ctx context.Context
}

// NewOutgoingCarrier creates a new Carrier with the given context. The
// outgoing carrier should be used with propagator's `Inject()` method in the
// outgoing rpc path.
func NewOutgoingCarrier(ctx context.Context) *OutgoingCarrier {
return &OutgoingCarrier{ctx: ctx}
}

// Get just logs an error and returns an empty string. It implements the
// `TextMapCarrier` interface but should not be used with `OutgoingCarrier`.
func (c *OutgoingCarrier) Get(string) string {
logger.Error("Get() should not be used with `IncomingCarrier`")
return ""

Check warning on line 103 in stats/opentelemetry/internal/tracing/carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/carrier.go#L101-L103

Added lines #L101 - L103 were not covered by tests
}

// Set stores the key-value pair in the carrier's outgoing context metadata.
//
// If the key already exists, given value is appended to the last.
func (c *Carrier) Set(key, value string) {
func (c *OutgoingCarrier) Set(key, value string) {
c.ctx = metadata.AppendToOutgoingContext(c.ctx, key, value)
}

// Keys returns the keys stored in the carrier's context metadata. It returns
// keys from outgoing context metadata if propagation direction is outgoing and
// if propagation direction is incoming, it returns keys from incoming context
// metadata. In all other cases, it returns nil.
func (c *Carrier) Keys() []string {
var md metadata.MD
var ok bool

switch c.direction {
case outgoing:
md, ok = metadata.FromOutgoingContext(c.ctx)
case incoming:
md, ok = metadata.FromIncomingContext(c.ctx)
}

// keys from outgoing context metadata.
func (c *OutgoingCarrier) Keys() []string {
md, ok := metadata.FromOutgoingContext(c.ctx)
if !ok {
return nil
}

Check warning on line 119 in stats/opentelemetry/internal/tracing/carrier.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/internal/tracing/carrier.go#L118-L119

Added lines #L118 - L119 were not covered by tests
Expand All @@ -105,7 +124,8 @@ func (c *Carrier) Keys() []string {
return keys
}

// Context returns the underlying context associated with the Carrier.
func (c *Carrier) Context() context.Context {
// Context returns the underlying context associated with the
// `OutgoingCarrier“.
func (c *OutgoingCarrier) Context() context.Context {
return c.ctx
}
58 changes: 9 additions & 49 deletions stats/opentelemetry/internal/tracing/carrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestGet verifies that `Carrier.Get()` returns correct value for the
// corresponding key in the carrier's context metadata, if key is present. If
// key is not present, it verifies that empty string is returned.
// TestIncomingCarrier verifies that `IncomingCarrier.Get()` returns correct
// value for the corresponding key in the carrier's context metadata, if key is
// present. If key is not present, it verifies that empty string is returned.
//
// If multiple values are present for a key, it verifies that last value is
// returned.
//
// If key ends with `-bin`, it verifies that a correct binary value is returned
// in the string format for the binary header.
func (s) TestGet(t *testing.T) {
func (s) TestIncomingCarrier(t *testing.T) {
tests := []struct {
name string
md metadata.MD
Expand Down Expand Up @@ -120,17 +120,17 @@ func (s) TestGet(t *testing.T) {
}
}

// TestSet verifies that a key-value pair is set in carrier's context metadata
// using `Carrier.Set()`. If key is not present, it verifies that
// key-value pair is insterted. If key is already present, it verifies that new
// value is appended at the end of list for the existing key.
// TestOutgoingCarrier verifies that a key-value pair is set in carrier's
// context metadata using `OutgoingCarrier.Set()`. If key is not present, it
// verifies that key-value pair is insterted. If key is already present, it
// verifies that new value is appended at the end of list for the existing key.
//
// If key ends with `-bin`, it verifies that a binary value is set for
// `-bin` header in string format.
//
// It also verifies that both existing and newly inserted keys are present in
// the carrier's context using `Carrier.Keys()`.
func (s) TestSet(t *testing.T) {
func (s) TestOutgoingCarrier(t *testing.T) {
tests := []struct {
name string
initialMD metadata.MD
Expand Down Expand Up @@ -188,43 +188,3 @@ func (s) TestSet(t *testing.T) {
})
}
}

func TestKeys(t *testing.T) {
tests := []struct {
name string
direction propagationDirection
md metadata.MD
want []string
}{
{
name: "outgoing ignores incoming",
direction: outgoing,
md: metadata.MD{"incoming-key": []string{"incoming-value"}, "outgoing-key": []string{"outgoing-value"}},
want: []string{"outgoing-key"},
},
{
name: "incoming ignores outgoing",
direction: incoming,
md: metadata.MD{"incoming-key": []string{"incoming-value"}, "outgoing-key": []string{"outgoing-value"}},
want: []string{"incoming-key"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = metadata.NewIncomingContext(ctx, metadata.MD{"incoming-key": test.md["incoming-key"]})
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"outgoing-key": test.md["outgoing-key"]})
var c *Carrier
if test.direction == incoming {
c = NewIncomingCarrier(ctx)
} else {
c = NewOutgoingCarrier(ctx)
}
if got := c.Keys(); !cmp.Equal(test.want, got, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
t.Fatalf("c.Keys() = %v, want %v", got, test.want)
}
})
}
}

0 comments on commit d708a8b

Please sign in to comment.