Skip to content

Commit

Permalink
Detect messaging.{system,operation} in TranslateTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jan 26, 2024
1 parent 80b6af8 commit bd314dd
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 19 deletions.
15 changes: 10 additions & 5 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func TranslateTransaction(
)

var isHTTP, isRPC, isMessaging bool
var message modelpb.Message

var samplerType, samplerParam pcommon.Value
attributes.Range(func(kDots string, v pcommon.Value) bool {
Expand Down Expand Up @@ -411,8 +410,17 @@ func TranslateTransaction(

// messaging.*
case "message_bus.destination", semconv.AttributeMessagingDestination:
message.QueueName = stringval
if event.Transaction.Message == nil {
event.Transaction.Message = modelpb.MessageFromVTPool()
}
event.Transaction.Message.QueueName = stringval
isMessaging = true
case semconv.AttributeMessagingSystem:
isMessaging = true
modelpb.Labels(event.Labels).Set(k, stringval)
case semconv.AttributeMessagingOperation:
isMessaging = true
modelpb.Labels(event.Labels).Set(k, stringval)

// rpc.*
//
Expand Down Expand Up @@ -494,9 +502,6 @@ func TranslateTransaction(
}
event.Url = modelpb.ParseURL(httpURL, httpHost, httpScheme)
}
if isMessaging {
event.Transaction.Message = &message
}

if event.Client == nil && event.Source != nil {
event.Client = modelpb.ClientFromVTPool()
Expand Down
64 changes: 50 additions & 14 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,20 +609,56 @@ func TestRPCSpan(t *testing.T) {
}

func TestMessagingTransaction(t *testing.T) {
event := transformTransactionWithAttributes(t, map[string]interface{}{
"messaging.destination": "myQueue",
}, func(s ptrace.Span) {
s.SetKind(ptrace.SpanKindConsumer)
// Set parentID to imply this isn't the root, but
// kind==Consumer should still force the span to be translated
// as a transaction.
s.SetParentSpanID(pcommon.SpanID{3})
})
assert.Equal(t, "messaging", event.Transaction.Type)
assert.Empty(t, event.Labels)
assert.Equal(t, &modelpb.Message{
QueueName: "myQueue",
}, event.Transaction.Message)
for _, tc := range []struct {
attrs map[string]interface{}

expectedLabels map[string]*modelpb.LabelValue
expectedTxnMessage *modelpb.Message
}{
{
attrs: map[string]interface{}{
"messaging.destination": "myQueue",
},
expectedLabels: nil,
expectedTxnMessage: &modelpb.Message{
QueueName: "myQueue",
},
},
{
attrs: map[string]interface{}{
"messaging.system": "kafka",
},
expectedLabels: map[string]*modelpb.LabelValue{
"messaging_system": {Value: "kafka"},
},
expectedTxnMessage: nil,
},
{
attrs: map[string]interface{}{
"messaging.operation": "publish",
},
expectedLabels: map[string]*modelpb.LabelValue{
"messaging_operation": {Value: "publish"},
},
expectedTxnMessage: nil,
},
} {
tcName, err := json.Marshal(tc.attrs)
require.NoError(t, err)
t.Run(string(tcName), func(t *testing.T) {
event := transformTransactionWithAttributes(t, tc.attrs, func(s ptrace.Span) {
s.SetKind(ptrace.SpanKindConsumer)
// Set parentID to imply this isn't the root, but
// kind==Consumer should still force the span to be translated
// as a transaction.
s.SetParentSpanID(pcommon.SpanID{3})
})
assert.Equal(t, "messaging", event.Transaction.Type)
assert.Equal(t, tc.expectedLabels, event.Labels)
assert.Equal(t, tc.expectedTxnMessage, event.Transaction.Message)
})
}

}

func TestMessagingSpan(t *testing.T) {
Expand Down

0 comments on commit bd314dd

Please sign in to comment.