diff --git a/input/otlp/logs.go b/input/otlp/logs.go index 8906dcbb..28e3b87c 100644 --- a/input/otlp/logs.go +++ b/input/otlp/logs.go @@ -106,18 +106,22 @@ func (c *Consumer) convertInstrumentationLibraryLogs( ) { otelLogs := in.LogRecords() for i := 0; i < otelLogs.Len(); i++ { - event := c.convertLogRecord(otelLogs.At(i), baseEvent, timeDelta) + event := c.convertLogRecord(otelLogs.At(i), in.Scope(), baseEvent, timeDelta) *out = append(*out, event) } } func (c *Consumer) convertLogRecord( record plog.LogRecord, + scope pcommon.InstrumentationScope, baseEvent *modelpb.APMEvent, timeDelta time.Duration, ) *modelpb.APMEvent { event := baseEvent.CloneVT() initEventLabels(event) + + translateScopeMetadata(scope, event) + if record.Timestamp() == 0 { event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta)) } else { @@ -182,6 +186,17 @@ func (c *Consumer) convertLogRecord( event.Network.Connection = modelpb.NetworkConnectionFromVTPool() } event.Network.Connection.Type = v.Str() + // data_stream.* + case attributeDataStreamDataset: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Namespace = v.Str() default: setLabel(replaceDots(k), event, ifaceAttributeValue(v)) } diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index 43844e46..cb54951d 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -36,6 +36,7 @@ package otlp_test import ( "context" + "fmt" "testing" "time" @@ -467,6 +468,99 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) { assert.Equal(t, "MyEvent", processed[0].Event.Action) } +func TestConsumerConsumeLogsDataStream(t *testing.T) { + for _, tc := range []struct { + resourceDataStreamDataset string + resourceDataStreamNamespace string + scopeDataStreamDataset string + scopeDataStreamNamespace string + recordDataStreamDataset string + recordDataStreamNamespace string + + expectedDataStreamDataset string + expectedDataStreamNamespace string + }{ + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + scopeDataStreamDataset: "3", + scopeDataStreamNamespace: "4", + recordDataStreamDataset: "5", + recordDataStreamNamespace: "6", + expectedDataStreamDataset: "5", + expectedDataStreamNamespace: "6", + }, + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + scopeDataStreamDataset: "3", + scopeDataStreamNamespace: "4", + expectedDataStreamDataset: "3", + expectedDataStreamNamespace: "4", + }, + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + expectedDataStreamDataset: "1", + expectedDataStreamNamespace: "2", + }, + } { + tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace) + t.Run(tcName, func(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + if tc.resourceDataStreamDataset != "" { + resourceAttrs.PutStr("data_stream.dataset", tc.resourceDataStreamDataset) + } + if tc.resourceDataStreamNamespace != "" { + resourceAttrs.PutStr("data_stream.namespace", tc.resourceDataStreamNamespace) + } + + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + scopeAttrs := resourceLogs.ScopeLogs().At(0).Scope().Attributes() + if tc.scopeDataStreamDataset != "" { + scopeAttrs.PutStr("data_stream.dataset", tc.scopeDataStreamDataset) + } + if tc.scopeDataStreamNamespace != "" { + scopeAttrs.PutStr("data_stream.namespace", tc.scopeDataStreamNamespace) + } + + record1 := newLogRecord("") // no log body + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + recordAttrs := scopeLogs.LogRecords().At(0).Attributes() + if tc.recordDataStreamDataset != "" { + recordAttrs.PutStr("data_stream.dataset", tc.recordDataStreamDataset) + } + if tc.recordDataStreamNamespace != "" { + recordAttrs.PutStr("data_stream.namespace", tc.recordDataStreamNamespace) + } + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + assert.NotZero(t, processed[0].Timestamp) + processed[0].Timestamp = 0 + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) + + assert.Len(t, processed, 1) + assert.Equal(t, tc.expectedDataStreamDataset, processed[0].DataStream.Dataset) + assert.Equal(t, tc.expectedDataStreamNamespace, processed[0].DataStream.Namespace) + }) + } +} + func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) { logs := plog.NewLogs() resourceLogs := logs.ResourceLogs().AppendEmpty() diff --git a/input/otlp/metadata.go b/input/otlp/metadata.go index c0a57e04..1fa767b4 100644 --- a/input/otlp/metadata.go +++ b/input/otlp/metadata.go @@ -290,6 +290,18 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent) case "telemetry.sdk.elastic_export_timestamp": // Do nothing. + // data_stream.* + case attributeDataStreamDataset: + if out.DataStream == nil { + out.DataStream = modelpb.DataStreamFromVTPool() + } + out.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if out.DataStream == nil { + out.DataStream = modelpb.DataStreamFromVTPool() + } + out.DataStream.Namespace = v.Str() + default: if out.Labels == nil { out.Labels = make(modelpb.Labels) @@ -400,6 +412,25 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent) } } +func translateScopeMetadata(scope pcommon.InstrumentationScope, out *modelpb.APMEvent) { + scope.Attributes().Range(func(k string, v pcommon.Value) bool { + switch k { + // data_stream.* + case attributeDataStreamDataset: + if out.DataStream == nil { + out.DataStream = modelpb.DataStreamFromVTPool() + } + out.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if out.DataStream == nil { + out.DataStream = modelpb.DataStreamFromVTPool() + } + out.DataStream.Namespace = v.Str() + } + return true + }) +} + func cleanServiceName(name string) string { return serviceNameInvalidRegexp.ReplaceAllString(truncate(name), "_") } diff --git a/input/otlp/metrics.go b/input/otlp/metrics.go index bb47eb7e..b6ea2da6 100644 --- a/input/otlp/metrics.go +++ b/input/otlp/metrics.go @@ -144,6 +144,9 @@ func (c *Consumer) convertScopeMetrics( } for key, ms := range ms { event := baseEvent.CloneVT() + + translateScopeMetadata(in.Scope(), event) + event.Timestamp = modelpb.FromTime(key.timestamp.Add(timeDelta)) metrs := make([]*modelpb.MetricsetSample, 0, len(ms.samples)) for _, s := range ms.samples { @@ -155,7 +158,21 @@ func (c *Consumer) convertScopeMetrics( if ms.attributes.Len() > 0 { initEventLabels(event) ms.attributes.Range(func(k string, v pcommon.Value) bool { - setLabel(k, event, ifaceAttributeValue(v)) + switch k { + // data_stream.* + case attributeDataStreamDataset: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Namespace = v.Str() + default: + setLabel(k, event, ifaceAttributeValue(v)) + } return true }) if len(event.Labels) == 0 { diff --git a/input/otlp/metrics_test.go b/input/otlp/metrics_test.go index 7a4cbdff..a5114898 100644 --- a/input/otlp/metrics_test.go +++ b/input/otlp/metrics_test.go @@ -728,6 +728,129 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) { } } +func TestConsumeMetricsDataStream(t *testing.T) { + for _, tc := range []struct { + resourceDataStreamDataset string + resourceDataStreamNamespace string + scopeDataStreamDataset string + scopeDataStreamNamespace string + recordDataStreamDataset string + recordDataStreamNamespace string + + expectedDataStreamDataset string + expectedDataStreamNamespace string + }{ + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + scopeDataStreamDataset: "3", + scopeDataStreamNamespace: "4", + recordDataStreamDataset: "5", + recordDataStreamNamespace: "6", + expectedDataStreamDataset: "5", + expectedDataStreamNamespace: "6", + }, + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + scopeDataStreamDataset: "3", + scopeDataStreamNamespace: "4", + expectedDataStreamDataset: "3", + expectedDataStreamNamespace: "4", + }, + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + expectedDataStreamDataset: "1", + expectedDataStreamNamespace: "2", + }, + } { + tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace) + t.Run(tcName, func(t *testing.T) { + metrics := pmetric.NewMetrics() + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + resourceAttrs := metrics.ResourceMetrics().At(0).Resource().Attributes() + if tc.resourceDataStreamDataset != "" { + resourceAttrs.PutStr("data_stream.dataset", tc.resourceDataStreamDataset) + } + if tc.resourceDataStreamNamespace != "" { + resourceAttrs.PutStr("data_stream.namespace", tc.resourceDataStreamNamespace) + } + + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + scopeAttrs := resourceMetrics.ScopeMetrics().At(0).Scope().Attributes() + if tc.scopeDataStreamDataset != "" { + scopeAttrs.PutStr("data_stream.dataset", tc.scopeDataStreamDataset) + } + if tc.scopeDataStreamNamespace != "" { + scopeAttrs.PutStr("data_stream.namespace", tc.scopeDataStreamNamespace) + } + + metricSlice := scopeMetrics.Metrics() + appendMetric := func(name string) pmetric.Metric { + metric := metricSlice.AppendEmpty() + metric.SetName(name) + return metric + } + + timestamp0 := time.Unix(123, 0).UTC() + + gauge := appendMetric("gauge_metric").SetEmptyGauge() + gaugeDP0 := gauge.DataPoints().AppendEmpty() + gaugeDP0.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) + gaugeDP0.SetDoubleValue(5.6) + if tc.recordDataStreamDataset != "" { + gaugeDP0.Attributes().PutStr("data_stream.dataset", tc.recordDataStreamDataset) + } + if tc.recordDataStreamNamespace != "" { + gaugeDP0.Attributes().PutStr("data_stream.namespace", tc.recordDataStreamNamespace) + } + gaugeDP1 := gauge.DataPoints().AppendEmpty() + gaugeDP1.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0)) + gaugeDP1.SetDoubleValue(6) + gaugeDP1.Attributes().PutStr("data_stream.dataset", "foo") + gaugeDP1.Attributes().PutStr("data_stream.namespace", "bar") + + events, _, result, err := transformMetrics(t, metrics) + assert.NoError(t, err) + expectedResult := otlp.ConsumeMetricsResult{} + assert.Equal(t, expectedResult, result) + + service := modelpb.Service{Name: "unknown", Language: &modelpb.Language{Name: "unknown"}} + agent := modelpb.Agent{Name: "otlp", Version: "unknown"} + dataStream := modelpb.DataStream{ + Dataset: tc.expectedDataStreamDataset, + Namespace: tc.expectedDataStreamNamespace, + } + expected := []*modelpb.APMEvent{{ + Agent: &agent, + DataStream: &dataStream, + Service: &service, + Timestamp: modelpb.FromTime(timestamp0), + Metricset: &modelpb.Metricset{ + Name: "app", + Samples: []*modelpb.MetricsetSample{ + {Name: "gauge_metric", Value: 5.6, Type: modelpb.MetricType_METRIC_TYPE_GAUGE}, + }, + }, + }, { + Agent: &agent, + DataStream: &modelpb.DataStream{Dataset: "foo", Namespace: "bar"}, + Service: &service, + Timestamp: modelpb.FromTime(timestamp0), + Metricset: &modelpb.Metricset{ + Name: "app", + Samples: []*modelpb.MetricsetSample{ + {Name: "gauge_metric", Value: 6, Type: modelpb.MetricType_METRIC_TYPE_GAUGE}, + }, + }, + }} + + eventsMatch(t, expected, events) + }) + } +} + /* TODO func TestMetricsLogging(t *testing.T) { for _, level := range []logp.Level{logp.InfoLevel, logp.DebugLevel} { diff --git a/input/otlp/test_approved/span_jaeger_data_stream.approved.json b/input/otlp/test_approved/span_jaeger_data_stream.approved.json new file mode 100644 index 00000000..353f4856 --- /dev/null +++ b/input/otlp/test_approved/span_jaeger_data_stream.approved.json @@ -0,0 +1,42 @@ +{ + "events": [ + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream": { + "dataset": "1", + "namespace": "2" + }, + "event": { + "duration": 79000000000, + "outcome": "success" + }, + "host": { + "hostname": "host-abc" + }, + "parent": { + "id": "0000000058585858" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "span": { + "id": "0000000041414646", + "representative_count": 1, + "type": "unknown" + }, + "timestamp": { + "us": 1576500418000768 + }, + "trace": { + "id": "00000000000000000000000046467830" + } + } + ] +} \ No newline at end of file diff --git a/input/otlp/test_approved/span_jaeger_data_stream_with_error.approved.json b/input/otlp/test_approved/span_jaeger_data_stream_with_error.approved.json new file mode 100644 index 00000000..d34010ba --- /dev/null +++ b/input/otlp/test_approved/span_jaeger_data_stream_with_error.approved.json @@ -0,0 +1,81 @@ +{ + "events": [ + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream": { + "dataset": "1", + "namespace": "2" + }, + "event": { + "duration": 79000000000, + "outcome": "success" + }, + "host": { + "hostname": "host-abc" + }, + "parent": { + "id": "0000000058585858" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "span": { + "id": "0000000041414646", + "representative_count": 1, + "type": "unknown" + }, + "timestamp": { + "us": 1576500418000768 + }, + "trace": { + "id": "00000000000000000000000046467830" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream": { + "dataset": "3", + "namespace": "4" + }, + "error": { + "exception": [ + { + "message": "no connection established" + } + ], + "log": { + "message": "retrying connection" + } + }, + "host": { + "hostname": "host-abc" + }, + "parent": { + "id": "0000000041414646" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "timestamp": { + "us": 1576500418000768 + }, + "trace": { + "id": "00000000000000000000000046467830" + } + } + ] +} \ No newline at end of file diff --git a/input/otlp/test_approved/transaction_jaeger_data_stream.approved.json b/input/otlp/test_approved/transaction_jaeger_data_stream.approved.json new file mode 100644 index 00000000..907e8279 --- /dev/null +++ b/input/otlp/test_approved/transaction_jaeger_data_stream.approved.json @@ -0,0 +1,36 @@ +{ + "events": [ + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream": { + "dataset": "1", + "namespace": "2" + }, + "event": { + "outcome": "success" + }, + "host": { + "hostname": "host-abc" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "timestamp": { + "us": 1576500418000768 + }, + "transaction": { + "representative_count": 1, + "result": "Success", + "sampled": true, + "type": "unknown" + } + } + ] +} \ No newline at end of file diff --git a/input/otlp/test_approved/transaction_jaeger_data_stream_with_error.approved.json b/input/otlp/test_approved/transaction_jaeger_data_stream_with_error.approved.json new file mode 100644 index 00000000..29c6df9a --- /dev/null +++ b/input/otlp/test_approved/transaction_jaeger_data_stream_with_error.approved.json @@ -0,0 +1,80 @@ +{ + "events": [ + { + "@timestamp": "2339-03-21T22:18:14.838Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream": { + "dataset": "1", + "namespace": "2" + }, + "event": { + "outcome": "success" + }, + "host": { + "hostname": "host-abc" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "timestamp": { + "us": 11651379494838206 + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "transaction": { + "representative_count": 1, + "result": "Success", + "sampled": true, + "type": "unknown" + } + }, + { + "@timestamp": "2019-12-16T12:46:58.000Z", + "agent": { + "name": "Jaeger", + "version": "unknown" + }, + "data_stream": { + "dataset": "3", + "namespace": "4" + }, + "error": { + "exception": [ + { + "message": "no connection established" + } + ], + "log": { + "message": "retrying connection" + } + }, + "host": { + "hostname": "host-abc" + }, + "service": { + "language": { + "name": "unknown" + }, + "name": "unknown" + }, + "span": {}, + "timestamp": { + "us": 1576500418000768 + }, + "trace": { + "id": "00000000000000000000000046467830" + }, + "transaction": { + "sampled": true, + "type": "unknown" + } + } + ] +} \ No newline at end of file diff --git a/input/otlp/traces.go b/input/otlp/traces.go index 6d774677..3b1b486e 100644 --- a/input/otlp/traces.go +++ b/input/otlp/traces.go @@ -77,6 +77,8 @@ const ( attributeUserAgentOriginal = "user_agent.original" attributeDbElasticsearchClusterName = "db.elasticsearch.cluster.name" attributeStackTrace = "code.stacktrace" // semconv 1.24 or later + attributeDataStreamDataset = "data_stream.dataset" + attributeDataStreamNamespace = "data_stream.namespace" ) // ConsumeTracesResult contains the number of rejected spans and error message for partial success response. @@ -172,6 +174,9 @@ func (c *Consumer) convertSpan( spanID := hexSpanID(otelSpan.SpanID()) representativeCount := getRepresentativeCountFromTracestateHeader(otelSpan.TraceState().AsRaw()) event := baseEvent.CloneVT() + + translateScopeMetadata(otelLibrary, event) + initEventLabels(event) event.Timestamp = modelpb.FromTime(startTime.Add(timeDelta)) if id := hexTraceID(otelSpan.TraceID()); id != "" { @@ -451,6 +456,19 @@ func TranslateTransaction( // should set this as a resource attribute (OTel) or tracer // tag (Jaeger). event.Service.Version = stringval + + // data_stream.* + case attributeDataStreamDataset: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Dataset = stringval + case attributeDataStreamNamespace: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Namespace = stringval + default: modelpb.Labels(event.Labels).Set(k, stringval) } @@ -777,6 +795,19 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode case "span.kind": // filter out case semconv.AttributePeerService: peerService = stringval + + // data_stream.* + case attributeDataStreamDataset: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Dataset = stringval + case attributeDataStreamNamespace: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Namespace = stringval + default: modelpb.Labels(event.Labels).Set(k, stringval) } @@ -1040,6 +1071,20 @@ func (c *Consumer) convertSpanEvent( exceptionType = v.Str() case "exception.escaped": exceptionEscaped = v.Bool() + + // data_stream.* + // Note: fields are parsed but dataset will be overridden by SetDataStream because it is an error + case attributeDataStreamDataset: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Namespace = v.Str() + default: setLabel(replaceDots(k), event, ifaceAttributeValue(v)) } @@ -1068,12 +1113,26 @@ func (c *Consumer) convertSpanEvent( event.Message = spanEvent.Name() setLogContext(event, parent) spanEvent.Attributes().Range(func(k string, v pcommon.Value) bool { - k = replaceDots(k) - if isJaeger && k == "message" { - event.Message = truncate(v.Str()) - return true + switch k { + // data_stream.* + case attributeDataStreamDataset: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if event.DataStream == nil { + event.DataStream = modelpb.DataStreamFromVTPool() + } + event.DataStream.Namespace = v.Str() + default: + k = replaceDots(k) + if isJaeger && k == "message" { + event.Message = truncate(v.Str()) + return true + } + setLabel(k, event, ifaceAttributeValue(v)) } - setLabel(k, event, ifaceAttributeValue(v)) return true }) } @@ -1111,6 +1170,20 @@ func (c *Consumer) convertJaegerErrorSpanEvent(event ptrace.SpanEvent, apmEvent isError = stringval == "error" case "message": logMessage = stringval + + // data_stream.* + // Note: fields are parsed but dataset will be overridden by SetDataStream because it is an error + case attributeDataStreamDataset: + if apmEvent.DataStream == nil { + apmEvent.DataStream = modelpb.DataStreamFromVTPool() + } + apmEvent.DataStream.Dataset = v.Str() + case attributeDataStreamNamespace: + if apmEvent.DataStream == nil { + apmEvent.DataStream = modelpb.DataStreamFromVTPool() + } + apmEvent.DataStream.Namespace = v.Str() + default: setLabel(replaceDots(k), apmEvent, ifaceAttributeValue(v)) } diff --git a/input/otlp/traces_test.go b/input/otlp/traces_test.go index 92945f97..84e29643 100644 --- a/input/otlp/traces_test.go +++ b/input/otlp/traces_test.go @@ -814,6 +814,90 @@ func TestSpanNetworkAttributes(t *testing.T) { assert.Equal(t, &expected, spanEvent.Network) } +func TestSpanDataStream(t *testing.T) { + for _, tc := range []struct { + resourceDataStreamDataset string + resourceDataStreamNamespace string + scopeDataStreamDataset string + scopeDataStreamNamespace string + recordDataStreamDataset string + recordDataStreamNamespace string + + expectedDataStreamDataset string + expectedDataStreamNamespace string + }{ + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + scopeDataStreamDataset: "3", + scopeDataStreamNamespace: "4", + recordDataStreamDataset: "5", + recordDataStreamNamespace: "6", + expectedDataStreamDataset: "5", + expectedDataStreamNamespace: "6", + }, + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + scopeDataStreamDataset: "3", + scopeDataStreamNamespace: "4", + expectedDataStreamDataset: "3", + expectedDataStreamNamespace: "4", + }, + { + resourceDataStreamDataset: "1", + resourceDataStreamNamespace: "2", + expectedDataStreamDataset: "1", + expectedDataStreamNamespace: "2", + }, + } { + for _, isTxn := range []bool{false, true} { + tcName := fmt.Sprintf("%s,%s,txn=%v", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace, isTxn) + t.Run(tcName, func(t *testing.T) { + traces := ptrace.NewTraces() + resourceSpans := traces.ResourceSpans().AppendEmpty() + resourceAttrs := traces.ResourceSpans().At(0).Resource().Attributes() + if tc.resourceDataStreamDataset != "" { + resourceAttrs.PutStr("data_stream.dataset", tc.resourceDataStreamDataset) + } + if tc.resourceDataStreamNamespace != "" { + resourceAttrs.PutStr("data_stream.namespace", tc.resourceDataStreamNamespace) + } + + scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() + scopeAttrs := resourceSpans.ScopeSpans().At(0).Scope().Attributes() + if tc.scopeDataStreamDataset != "" { + scopeAttrs.PutStr("data_stream.dataset", tc.scopeDataStreamDataset) + } + if tc.scopeDataStreamNamespace != "" { + scopeAttrs.PutStr("data_stream.namespace", tc.scopeDataStreamNamespace) + } + + otelSpan := scopeSpans.Spans().AppendEmpty() + otelSpan.SetTraceID(pcommon.TraceID{1}) + otelSpan.SetSpanID(pcommon.SpanID{2}) + if !isTxn { + otelSpan.SetParentSpanID(pcommon.SpanID{3}) + } + if tc.recordDataStreamDataset != "" { + otelSpan.Attributes().PutStr("data_stream.dataset", tc.recordDataStreamDataset) + } + if tc.recordDataStreamNamespace != "" { + otelSpan.Attributes().PutStr("data_stream.namespace", tc.recordDataStreamNamespace) + } + events := transformTraces(t, traces) + + dataStream := &modelpb.DataStream{ + Dataset: tc.expectedDataStreamDataset, + Namespace: tc.expectedDataStreamNamespace, + } + + assert.Equal(t, dataStream, (*events)[0].DataStream) + }) + } + } +} + func TestSessionID(t *testing.T) { sessionAttributes := map[string]interface{}{ "session.id": "opbeans-swift", @@ -1272,6 +1356,36 @@ func TestConsumer_JaegerTransaction(t *testing.T) { }, }}, }, + { + name: "jaeger_data_stream", + spans: []*jaegermodel.Span{{ + StartTime: testStartTime(), + Tags: []jaegermodel.KeyValue{ + jaegerKeyValue("data_stream.dataset", "1"), + jaegerKeyValue("data_stream.namespace", "2"), + }, + }}, + }, + { + name: "jaeger_data_stream_with_error", + spans: []*jaegermodel.Span{{ + TraceID: jaegermodel.NewTraceID(0, 0x46467830), + Tags: []jaegermodel.KeyValue{ + jaegerKeyValue("data_stream.dataset", "1"), + jaegerKeyValue("data_stream.namespace", "2"), + }, + Logs: []jaegermodel.Log{{ + Timestamp: testStartTime().Add(23 * time.Nanosecond), + Fields: jaegerKeyValues( + "event", "retrying connection", + "level", "error", + "error", "no connection established", + "data_stream.dataset", "3", + "data_stream.namespace", "4", + ), + }}, + }}, + }, } { t.Run(tc.name, func(t *testing.T) { traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{{ @@ -1382,6 +1496,34 @@ func TestConsumer_JaegerSpan(t *testing.T) { name: "jaeger_custom", spans: []*jaegermodel.Span{{}}, }, + { + name: "jaeger_data_stream", + spans: []*jaegermodel.Span{{ + Tags: []jaegermodel.KeyValue{ + jaegerKeyValue("data_stream.dataset", "1"), + jaegerKeyValue("data_stream.namespace", "2"), + }, + }}, + }, + { + name: "jaeger_data_stream_with_error", + spans: []*jaegermodel.Span{{ + Tags: []jaegermodel.KeyValue{ + jaegerKeyValue("data_stream.dataset", "1"), + jaegerKeyValue("data_stream.namespace", "2"), + }, + Logs: []jaegermodel.Log{{ + Timestamp: testStartTime().Add(23 * time.Nanosecond), + Fields: jaegerKeyValues( + "event", "retrying connection", + "level", "error", + "error", "no connection established", + "data_stream.dataset", "3", + "data_stream.namespace", "4", + ), + }}, + }}, + }, } { t.Run(tc.name, func(t *testing.T) { batch := &jaegermodel.Batch{ @@ -1649,6 +1791,43 @@ func TestSpanCodeStacktrace(t *testing.T) { }) } +func TestSpanEventsDataStream(t *testing.T) { + for _, isException := range []bool{false, true} { + t.Run(fmt.Sprintf("isException=%v", isException), func(t *testing.T) { + timestamp := time.Unix(123, 0).UTC() + + traces, spans := newTracesSpans() + traces.ResourceSpans().At(0).Resource().Attributes().PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + traces.ResourceSpans().At(0).Resource().Attributes().PutStr("data_stream.dataset", "1") + traces.ResourceSpans().At(0).Resource().Attributes().PutStr("data_stream.namespace", "2") + otelSpan := spans.Spans().AppendEmpty() + otelSpan.SetTraceID(pcommon.TraceID{1}) + otelSpan.SetSpanID(pcommon.SpanID{2}) + otelSpan.Attributes().PutStr("data_stream.dataset", "3") + otelSpan.Attributes().PutStr("data_stream.namespace", "4") + + spanEvent := ptrace.NewSpanEvent() + spanEvent.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + if isException { + spanEvent.SetName("exception") + spanEvent.Attributes().PutStr("exception.type", "java.net.ConnectException.OSError") + spanEvent.Attributes().PutStr("exception.message", "Division by zero") + } + + spanEvent.Attributes().PutStr("data_stream.dataset", "5") + spanEvent.Attributes().PutStr("data_stream.namespace", "6") + spanEvent.CopyTo(otelSpan.Events().AppendEmpty()) + + allEvents := transformTraces(t, traces) + events := (*allEvents)[1:] + assert.Equal(t, &modelpb.DataStream{ + Dataset: "5", + Namespace: "6", + }, events[0].DataStream) + }) + } +} + func testJaegerLogs() []jaegermodel.Log { return []jaegermodel.Log{{ // errors that can be converted to elastic errors diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go index cda58603..b7f14a59 100644 --- a/model/modelprocessor/datastream.go +++ b/model/modelprocessor/datastream.go @@ -52,7 +52,12 @@ func (s *SetDataStream) ProcessBatch(ctx context.Context, b *modelpb.Batch) erro if (*b)[i].DataStream == nil { (*b)[i].DataStream = modelpb.DataStreamFromVTPool() } - (*b)[i].DataStream.Namespace = s.Namespace + if (*b)[i].DataStream.Namespace == "" || isRUMAgentName((*b)[i].GetAgent().GetName()) { + // Only set namespace if + // 1. it is not already set in the input event; OR + // 2. it is from RUM agents, so that they cannot create arbitrarily many data streams + (*b)[i].DataStream.Namespace = s.Namespace + } if (*b)[i].DataStream.Type == "" || (*b)[i].DataStream.Dataset == "" { s.setDataStream((*b)[i]) } @@ -64,9 +69,13 @@ func (s *SetDataStream) setDataStream(event *modelpb.APMEvent) { switch event.Type() { case modelpb.SpanEventType, modelpb.TransactionEventType: event.DataStream.Type = tracesType - event.DataStream.Dataset = tracesDataset + if event.DataStream.Dataset == "" { + // Only set dataset if it is not already set in the input event + event.DataStream.Dataset = tracesDataset + } // In order to maintain different ILM policies, RUM traces are sent to // a different datastream. + // RUM agents should not be able to configure dataset. if isRUMAgentName(event.GetAgent().GetName()) { event.DataStream.Dataset = rumTracesDataset } @@ -75,10 +84,16 @@ func (s *SetDataStream) setDataStream(event *modelpb.APMEvent) { event.DataStream.Dataset = errorsDataset case modelpb.LogEventType: event.DataStream.Type = logsType - event.DataStream.Dataset = getAppLogsDataset(event) + if event.DataStream.Dataset == "" || isRUMAgentName(event.GetAgent().GetName()) { + // Only set dataset if it is not already set in the input event + event.DataStream.Dataset = getAppLogsDataset(event) + } case modelpb.MetricEventType: event.DataStream.Type = metricsType - event.DataStream.Dataset = metricsetDataset(event) + if event.DataStream.Dataset == "" || isRUMAgentName(event.GetAgent().GetName()) { + // Only set dataset if it is not already set in the input event + event.DataStream.Dataset = metricsetDataset(event) + } } } diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go index fd3a1d68..572bc544 100644 --- a/model/modelprocessor/datastream_test.go +++ b/model/modelprocessor/datastream_test.go @@ -40,6 +40,12 @@ func TestSetDataStream(t *testing.T) { }, { input: &modelpb.APMEvent{Span: &modelpb.Span{Type: "type"}}, output: &modelpb.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{ + Span: &modelpb.Span{Type: "type"}, + DataStream: &modelpb.DataStream{Dataset: "dataset", Namespace: "namespace"}, + }, + output: &modelpb.DataStream{Type: "traces", Dataset: "dataset", Namespace: "namespace"}, }, { input: &modelpb.APMEvent{Transaction: &modelpb.Transaction{Type: "type"}, Agent: &modelpb.Agent{Name: "js-base"}}, output: &modelpb.DataStream{Type: "traces", Dataset: "apm.rum", Namespace: "custom"}, @@ -58,6 +64,13 @@ func TestSetDataStream(t *testing.T) { }, { input: &modelpb.APMEvent{Span: &modelpb.Span{Type: "type"}, Agent: &modelpb.Agent{Name: "iOS/swift"}}, output: &modelpb.DataStream{Type: "traces", Dataset: "apm.rum", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{ + Span: &modelpb.Span{Type: "type"}, + Agent: &modelpb.Agent{Name: "iOS/swift"}, + DataStream: &modelpb.DataStream{Dataset: "dataset", Namespace: "namespace"}, + }, + output: &modelpb.DataStream{Type: "traces", Dataset: "apm.rum", Namespace: "custom"}, }, { input: &modelpb.APMEvent{Transaction: &modelpb.Transaction{Type: "type"}, Agent: &modelpb.Agent{Name: "go"}}, output: &modelpb.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, @@ -67,6 +80,12 @@ func TestSetDataStream(t *testing.T) { }, { input: &modelpb.APMEvent{Error: &modelpb.Error{}}, output: &modelpb.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{Error: &modelpb.Error{}, DataStream: &modelpb.DataStream{ + Dataset: "dataset", + Namespace: "namespace", + }}, + output: &modelpb.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "namespace"}, }, { input: &modelpb.APMEvent{Log: &modelpb.Log{}}, output: &modelpb.DataStream{Type: "logs", Dataset: "apm.app.unknown", Namespace: "custom"}, @@ -86,6 +105,14 @@ func TestSetDataStream(t *testing.T) { Service: &modelpb.Service{Name: "service-name"}, }, output: &modelpb.DataStream{Type: "logs", Dataset: "apm.app.service_name", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{ + Log: &modelpb.Log{}, + Agent: &modelpb.Agent{Name: "iOS/swift"}, + Service: &modelpb.Service{Name: "service-name"}, + DataStream: &modelpb.DataStream{Dataset: "dataset", Namespace: "namespace"}, + }, + output: &modelpb.DataStream{Type: "logs", Dataset: "apm.app.service_name", Namespace: "custom"}, }, { input: &modelpb.APMEvent{ Agent: &modelpb.Agent{Name: "rum-js"}, @@ -117,6 +144,19 @@ func TestSetDataStream(t *testing.T) { }, }, output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{ + Agent: &modelpb.Agent{Name: "rum-js"}, + Service: &modelpb.Service{Name: "service-name"}, + Metricset: &modelpb.Metricset{ + Samples: []*modelpb.MetricsetSample{ + {Name: "system.memory.total"}, // known agent metric + {Name: "custom_metric"}, // custom metric + }, + }, + DataStream: &modelpb.DataStream{Dataset: "dataset", Namespace: "namespace"}, + }, + output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"}, }, { input: &modelpb.APMEvent{ Service: &modelpb.Service{Name: "service-name"}, @@ -124,6 +164,14 @@ func TestSetDataStream(t *testing.T) { Transaction: &modelpb.Transaction{Name: "foo"}, }, output: &modelpb.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"}, + }, { + input: &modelpb.APMEvent{ + Service: &modelpb.Service{Name: "service-name"}, + Metricset: &modelpb.Metricset{}, + Transaction: &modelpb.Transaction{Name: "foo"}, + DataStream: &modelpb.DataStream{Dataset: "dataset", Namespace: "namespace"}, + }, + output: &modelpb.DataStream{Type: "metrics", Dataset: "dataset", Namespace: "namespace"}, }, { input: &modelpb.APMEvent{ Service: &modelpb.Service{Name: "service-name"}, @@ -151,11 +199,13 @@ func TestSetDataStream(t *testing.T) { }} for _, test := range tests { - batch := modelpb.Batch{test.input} - processor := modelprocessor.SetDataStream{Namespace: "custom"} - err := processor.ProcessBatch(context.Background(), &batch) - assert.NoError(t, err) - assert.Equal(t, test.output, batch[0].DataStream) + t.Run("", func(t *testing.T) { + batch := modelpb.Batch{test.input} + processor := modelprocessor.SetDataStream{Namespace: "custom"} + err := processor.ProcessBatch(context.Background(), &batch) + assert.NoError(t, err) + assert.Equal(t, test.output, batch[0].DataStream) + }) } }