Skip to content

Commit

Permalink
Respect data_stream.{dataset,namespace} for OTel logs, metrics, and t…
Browse files Browse the repository at this point in the history
…races (#201)

* Parse data_stream.{dataset,namespace} from OTel logs, metrics and traces
* Route using user-defined data_stream.{dataset,namespace}

Implications / Breaking changes

* data_stream.dataset and data_stream.namespace are parsed in the order of resource attributes, scope attributes, span attributes, and span events attributes. The latest match wins. e.g. a span event data_stream.dataset will override the span's attributes.
* Implied by the previous point, data_stream.dataset and data_stream.namespace are inherited from earlier levels if they are not specified at the current level. e.g. if a span does not specify data_stream.dataset, it will use the scope's data_stream.dataset.
* data_stream.dataset and data_stream.namespace are parsed and no longer set as labels (data_stream_dataset and data_stream_namespace)

Edge cases

* Although data_stream.dataset and data_stream.namespace are parsed for error span events, only data_stream.namespace will be respected, because data_stream.dataset will be overridden by SetDataStream.
* For RUM events, although data_stream.dataset and data_stream.namespace are parsed, none of them will be respected as they will be overridden by SetDataStream.
  • Loading branch information
carsonip authored Feb 6, 2024
1 parent 575dc0d commit 3b28495
Show file tree
Hide file tree
Showing 13 changed files with 852 additions and 16 deletions.
17 changes: 16 additions & 1 deletion input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,22 @@ func (c *Consumer) convertInstrumentationLibraryLogs(
) {
otelLogs := in.LogRecords()
for i := 0; i < otelLogs.Len(); i++ {
event := c.convertLogRecord(otelLogs.At(i), baseEvent, timeDelta)
event := c.convertLogRecord(otelLogs.At(i), in.Scope(), baseEvent, timeDelta)
*out = append(*out, event)
}
}

func (c *Consumer) convertLogRecord(
record plog.LogRecord,
scope pcommon.InstrumentationScope,
baseEvent *modelpb.APMEvent,
timeDelta time.Duration,
) *modelpb.APMEvent {
event := baseEvent.CloneVT()
initEventLabels(event)

translateScopeMetadata(scope, event)

if record.Timestamp() == 0 {
event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta))
} else {
Expand Down Expand Up @@ -182,6 +186,17 @@ func (c *Consumer) convertLogRecord(
event.Network.Connection = modelpb.NetworkConnectionFromVTPool()
}
event.Network.Connection.Type = v.Str()
// data_stream.*
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Dataset = v.Str()
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = v.Str()
default:
setLabel(replaceDots(k), event, ifaceAttributeValue(v))
}
Expand Down
94 changes: 94 additions & 0 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package otlp_test

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -467,6 +468,99 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) {
assert.Equal(t, "MyEvent", processed[0].Event.Action)
}

func TestConsumerConsumeLogsDataStream(t *testing.T) {
for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
scopeDataStreamDataset string
scopeDataStreamNamespace string
recordDataStreamDataset string
recordDataStreamNamespace string

expectedDataStreamDataset string
expectedDataStreamNamespace string
}{
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
scopeDataStreamDataset: "3",
scopeDataStreamNamespace: "4",
recordDataStreamDataset: "5",
recordDataStreamNamespace: "6",
expectedDataStreamDataset: "5",
expectedDataStreamNamespace: "6",
},
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
scopeDataStreamDataset: "3",
scopeDataStreamNamespace: "4",
expectedDataStreamDataset: "3",
expectedDataStreamNamespace: "4",
},
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes()
if tc.resourceDataStreamDataset != "" {
resourceAttrs.PutStr("data_stream.dataset", tc.resourceDataStreamDataset)
}
if tc.resourceDataStreamNamespace != "" {
resourceAttrs.PutStr("data_stream.namespace", tc.resourceDataStreamNamespace)
}

scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
scopeAttrs := resourceLogs.ScopeLogs().At(0).Scope().Attributes()
if tc.scopeDataStreamDataset != "" {
scopeAttrs.PutStr("data_stream.dataset", tc.scopeDataStreamDataset)
}
if tc.scopeDataStreamNamespace != "" {
scopeAttrs.PutStr("data_stream.namespace", tc.scopeDataStreamNamespace)
}

record1 := newLogRecord("") // no log body
record1.CopyTo(scopeLogs.LogRecords().AppendEmpty())
recordAttrs := scopeLogs.LogRecords().At(0).Attributes()
if tc.recordDataStreamDataset != "" {
recordAttrs.PutStr("data_stream.dataset", tc.recordDataStreamDataset)
}
if tc.recordDataStreamNamespace != "" {
recordAttrs.PutStr("data_stream.namespace", tc.recordDataStreamNamespace)
}

var processed modelpb.Batch
var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error {
if processed != nil {
panic("already processes batch")
}
processed = batch.Clone()
assert.NotZero(t, processed[0].Timestamp)
processed[0].Timestamp = 0
return nil
}
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

assert.Len(t, processed, 1)
assert.Equal(t, tc.expectedDataStreamDataset, processed[0].DataStream.Dataset)
assert.Equal(t, tc.expectedDataStreamNamespace, processed[0].DataStream.Namespace)
})
}
}

func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
Expand Down
31 changes: 31 additions & 0 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
case "telemetry.sdk.elastic_export_timestamp":
// Do nothing.

// data_stream.*
case attributeDataStreamDataset:
if out.DataStream == nil {
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Dataset = v.Str()
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Namespace = v.Str()

default:
if out.Labels == nil {
out.Labels = make(modelpb.Labels)
Expand Down Expand Up @@ -400,6 +412,25 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
}
}

func translateScopeMetadata(scope pcommon.InstrumentationScope, out *modelpb.APMEvent) {
scope.Attributes().Range(func(k string, v pcommon.Value) bool {
switch k {
// data_stream.*
case attributeDataStreamDataset:
if out.DataStream == nil {
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Dataset = v.Str()
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = modelpb.DataStreamFromVTPool()
}
out.DataStream.Namespace = v.Str()
}
return true
})
}

func cleanServiceName(name string) string {
return serviceNameInvalidRegexp.ReplaceAllString(truncate(name), "_")
}
Expand Down
19 changes: 18 additions & 1 deletion input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (c *Consumer) convertScopeMetrics(
}
for key, ms := range ms {
event := baseEvent.CloneVT()

translateScopeMetadata(in.Scope(), event)

event.Timestamp = modelpb.FromTime(key.timestamp.Add(timeDelta))
metrs := make([]*modelpb.MetricsetSample, 0, len(ms.samples))
for _, s := range ms.samples {
Expand All @@ -155,7 +158,21 @@ func (c *Consumer) convertScopeMetrics(
if ms.attributes.Len() > 0 {
initEventLabels(event)
ms.attributes.Range(func(k string, v pcommon.Value) bool {
setLabel(k, event, ifaceAttributeValue(v))
switch k {
// data_stream.*
case attributeDataStreamDataset:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Dataset = v.Str()
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = modelpb.DataStreamFromVTPool()
}
event.DataStream.Namespace = v.Str()
default:
setLabel(k, event, ifaceAttributeValue(v))
}
return true
})
if len(event.Labels) == 0 {
Expand Down
123 changes: 123 additions & 0 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,129 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) {
}
}

func TestConsumeMetricsDataStream(t *testing.T) {
for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
scopeDataStreamDataset string
scopeDataStreamNamespace string
recordDataStreamDataset string
recordDataStreamNamespace string

expectedDataStreamDataset string
expectedDataStreamNamespace string
}{
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
scopeDataStreamDataset: "3",
scopeDataStreamNamespace: "4",
recordDataStreamDataset: "5",
recordDataStreamNamespace: "6",
expectedDataStreamDataset: "5",
expectedDataStreamNamespace: "6",
},
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
scopeDataStreamDataset: "3",
scopeDataStreamNamespace: "4",
expectedDataStreamDataset: "3",
expectedDataStreamNamespace: "4",
},
{
resourceDataStreamDataset: "1",
resourceDataStreamNamespace: "2",
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
resourceAttrs := metrics.ResourceMetrics().At(0).Resource().Attributes()
if tc.resourceDataStreamDataset != "" {
resourceAttrs.PutStr("data_stream.dataset", tc.resourceDataStreamDataset)
}
if tc.resourceDataStreamNamespace != "" {
resourceAttrs.PutStr("data_stream.namespace", tc.resourceDataStreamNamespace)
}

scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty()
scopeAttrs := resourceMetrics.ScopeMetrics().At(0).Scope().Attributes()
if tc.scopeDataStreamDataset != "" {
scopeAttrs.PutStr("data_stream.dataset", tc.scopeDataStreamDataset)
}
if tc.scopeDataStreamNamespace != "" {
scopeAttrs.PutStr("data_stream.namespace", tc.scopeDataStreamNamespace)
}

metricSlice := scopeMetrics.Metrics()
appendMetric := func(name string) pmetric.Metric {
metric := metricSlice.AppendEmpty()
metric.SetName(name)
return metric
}

timestamp0 := time.Unix(123, 0).UTC()

gauge := appendMetric("gauge_metric").SetEmptyGauge()
gaugeDP0 := gauge.DataPoints().AppendEmpty()
gaugeDP0.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0))
gaugeDP0.SetDoubleValue(5.6)
if tc.recordDataStreamDataset != "" {
gaugeDP0.Attributes().PutStr("data_stream.dataset", tc.recordDataStreamDataset)
}
if tc.recordDataStreamNamespace != "" {
gaugeDP0.Attributes().PutStr("data_stream.namespace", tc.recordDataStreamNamespace)
}
gaugeDP1 := gauge.DataPoints().AppendEmpty()
gaugeDP1.SetTimestamp(pcommon.NewTimestampFromTime(timestamp0))
gaugeDP1.SetDoubleValue(6)
gaugeDP1.Attributes().PutStr("data_stream.dataset", "foo")
gaugeDP1.Attributes().PutStr("data_stream.namespace", "bar")

events, _, result, err := transformMetrics(t, metrics)
assert.NoError(t, err)
expectedResult := otlp.ConsumeMetricsResult{}
assert.Equal(t, expectedResult, result)

service := modelpb.Service{Name: "unknown", Language: &modelpb.Language{Name: "unknown"}}
agent := modelpb.Agent{Name: "otlp", Version: "unknown"}
dataStream := modelpb.DataStream{
Dataset: tc.expectedDataStreamDataset,
Namespace: tc.expectedDataStreamNamespace,
}
expected := []*modelpb.APMEvent{{
Agent: &agent,
DataStream: &dataStream,
Service: &service,
Timestamp: modelpb.FromTime(timestamp0),
Metricset: &modelpb.Metricset{
Name: "app",
Samples: []*modelpb.MetricsetSample{
{Name: "gauge_metric", Value: 5.6, Type: modelpb.MetricType_METRIC_TYPE_GAUGE},
},
},
}, {
Agent: &agent,
DataStream: &modelpb.DataStream{Dataset: "foo", Namespace: "bar"},
Service: &service,
Timestamp: modelpb.FromTime(timestamp0),
Metricset: &modelpb.Metricset{
Name: "app",
Samples: []*modelpb.MetricsetSample{
{Name: "gauge_metric", Value: 6, Type: modelpb.MetricType_METRIC_TYPE_GAUGE},
},
},
}}

eventsMatch(t, expected, events)
})
}
}

/* TODO
func TestMetricsLogging(t *testing.T) {
for _, level := range []logp.Level{logp.InfoLevel, logp.DebugLevel} {
Expand Down
Loading

0 comments on commit 3b28495

Please sign in to comment.