Skip to content

Commit

Permalink
Implement trace injection for SQS and SNS
Browse files Browse the repository at this point in the history
  • Loading branch information
nhulston committed Oct 7, 2024
1 parent 616cb4a commit be89749
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 6 deletions.
63 changes: 60 additions & 3 deletions contrib/aws/aws-sdk-go-v2/aws/sns/sns.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package sns

import (
"context"
"encoding/json"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"
"github.com/aws/smithy-go/middleware"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

const (
Expand All @@ -27,13 +32,65 @@ func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operati
}

func handlePublish(ctx context.Context, in middleware.InitializeInput) {
// TODO
params, ok := in.Parameters.(*sns.PublishInput)
if !ok {
log.Debug("Unable to read PublishInput params")
return
}

if params.MessageAttributes == nil {
params.MessageAttributes = make(map[string]types.MessageAttributeValue)
}

injectTraceContext(ctx, params.MessageAttributes)
}

func handlePublishBatch(ctx context.Context, in middleware.InitializeInput) {
// TODO
params, ok := in.Parameters.(*sns.PublishBatchInput)
if !ok {
log.Debug("Unable to read PublishBatch params")
return
}

for i := range params.PublishBatchRequestEntries {
entryPtr := &params.PublishBatchRequestEntries[i]
if entryPtr.MessageAttributes == nil {
entryPtr.MessageAttributes = make(map[string]types.MessageAttributeValue)
}
injectTraceContext(ctx, entryPtr.MessageAttributes)
}
}

func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) {
// TODO
span, ok := tracer.SpanFromContext(ctx)
if !ok || span == nil {
log.Debug("Unable to find span from context")
return
}

// SNS only allows a maximum of 10 message attributes.
// https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
// Only inject if there's room.
if len(messageAttributes) >= maxMessageAttributes {
log.Debug("Cannot inject trace context: message already has maximum allowed attributes")
return
}

carrier := make(messageCarrier)
err := tracer.Inject(span.Context(), carrier)
if err != nil {
log.Debug("Unable to inject trace context: %s", err.Error())
return
}

jsonBytes, err := json.Marshal(carrier)
if err != nil {
log.Debug("Unable to marshal trace context: %s", err.Error())
return
}

messageAttributes[datadogKey] = types.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(string(jsonBytes)),
}
}
63 changes: 60 additions & 3 deletions contrib/aws/aws-sdk-go-v2/aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package sqs

import (
"context"
"encoding/json"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go/middleware"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

const (
Expand All @@ -27,13 +32,65 @@ func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operati
}

func handleSendMessage(ctx context.Context, in middleware.InitializeInput) {
// TODO
params, ok := in.Parameters.(*sqs.SendMessageInput)
if !ok {
log.Debug("Unable to read SendMessage params")
return
}

if params.MessageAttributes == nil {
params.MessageAttributes = make(map[string]types.MessageAttributeValue)
}

injectTraceContext(ctx, params.MessageAttributes)
}

func handleSendMessageBatch(ctx context.Context, in middleware.InitializeInput) {
// TODO
params, ok := in.Parameters.(*sqs.SendMessageBatchInput)
if !ok {
log.Debug("Unable to read SendMessageBatch params")
return
}

for i := range params.Entries {
entryPtr := &params.Entries[i]
if entryPtr.MessageAttributes == nil {
entryPtr.MessageAttributes = make(map[string]types.MessageAttributeValue)
}
injectTraceContext(ctx, entryPtr.MessageAttributes)
}
}

func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) {
// TODO
span, ok := tracer.SpanFromContext(ctx)
if !ok || span == nil {
log.Debug("Unable to find span from context")
return
}

// SQS only allows a maximum of 10 message attributes.
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
// Only inject if there's room.
if len(messageAttributes) >= maxMessageAttributes {
log.Debug("Cannot inject trace context: message already has maximum allowed attributes")
return
}

carrier := make(messageCarrier)
err := tracer.Inject(span.Context(), carrier)
if err != nil {
log.Debug("Unable to inject trace context: %s", err.Error())
return
}

jsonBytes, err := json.Marshal(carrier)
if err != nil {
log.Debug("Unable to marshal trace context: %s", err.Error())
return
}

messageAttributes[datadogKey] = types.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(string(jsonBytes)),
}
}

0 comments on commit be89749

Please sign in to comment.