diff --git a/input/otlp/traces.go b/input/otlp/traces.go index 94072c71..816a315c 100644 --- a/input/otlp/traces.go +++ b/input/otlp/traces.go @@ -252,7 +252,6 @@ func TranslateTransaction( ) var isHTTP, isRPC, isMessaging bool - var message modelpb.Message var samplerType, samplerParam pcommon.Value attributes.Range(func(kDots string, v pcommon.Value) bool { @@ -411,8 +410,17 @@ func TranslateTransaction( // messaging.* case "message_bus.destination", semconv.AttributeMessagingDestination: - message.QueueName = stringval + if event.Transaction.Message == nil { + event.Transaction.Message = modelpb.MessageFromVTPool() + } + event.Transaction.Message.QueueName = stringval isMessaging = true + case semconv.AttributeMessagingSystem: + isMessaging = true + modelpb.Labels(event.Labels).Set(k, stringval) + case semconv.AttributeMessagingOperation: + isMessaging = true + modelpb.Labels(event.Labels).Set(k, stringval) // rpc.* // @@ -494,9 +502,6 @@ func TranslateTransaction( } event.Url = modelpb.ParseURL(httpURL, httpHost, httpScheme) } - if isMessaging { - event.Transaction.Message = &message - } if event.Client == nil && event.Source != nil { event.Client = modelpb.ClientFromVTPool() diff --git a/input/otlp/traces_test.go b/input/otlp/traces_test.go index 474e4b8c..92945f97 100644 --- a/input/otlp/traces_test.go +++ b/input/otlp/traces_test.go @@ -609,20 +609,56 @@ func TestRPCSpan(t *testing.T) { } func TestMessagingTransaction(t *testing.T) { - event := transformTransactionWithAttributes(t, map[string]interface{}{ - "messaging.destination": "myQueue", - }, func(s ptrace.Span) { - s.SetKind(ptrace.SpanKindConsumer) - // Set parentID to imply this isn't the root, but - // kind==Consumer should still force the span to be translated - // as a transaction. - s.SetParentSpanID(pcommon.SpanID{3}) - }) - assert.Equal(t, "messaging", event.Transaction.Type) - assert.Empty(t, event.Labels) - assert.Equal(t, &modelpb.Message{ - QueueName: "myQueue", - }, event.Transaction.Message) + for _, tc := range []struct { + attrs map[string]interface{} + + expectedLabels map[string]*modelpb.LabelValue + expectedTxnMessage *modelpb.Message + }{ + { + attrs: map[string]interface{}{ + "messaging.destination": "myQueue", + }, + expectedLabels: nil, + expectedTxnMessage: &modelpb.Message{ + QueueName: "myQueue", + }, + }, + { + attrs: map[string]interface{}{ + "messaging.system": "kafka", + }, + expectedLabels: map[string]*modelpb.LabelValue{ + "messaging_system": {Value: "kafka"}, + }, + expectedTxnMessage: nil, + }, + { + attrs: map[string]interface{}{ + "messaging.operation": "publish", + }, + expectedLabels: map[string]*modelpb.LabelValue{ + "messaging_operation": {Value: "publish"}, + }, + expectedTxnMessage: nil, + }, + } { + tcName, err := json.Marshal(tc.attrs) + require.NoError(t, err) + t.Run(string(tcName), func(t *testing.T) { + event := transformTransactionWithAttributes(t, tc.attrs, func(s ptrace.Span) { + s.SetKind(ptrace.SpanKindConsumer) + // Set parentID to imply this isn't the root, but + // kind==Consumer should still force the span to be translated + // as a transaction. + s.SetParentSpanID(pcommon.SpanID{3}) + }) + assert.Equal(t, "messaging", event.Transaction.Type) + assert.Equal(t, tc.expectedLabels, event.Labels) + assert.Equal(t, tc.expectedTxnMessage, event.Transaction.Message) + }) + } + } func TestMessagingSpan(t *testing.T) {