From 82201b991ae5af501a25b13b38a4268b277e15b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar?= <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 19 Oct 2023 14:03:08 +0200 Subject: [PATCH] Setting event timestamp from OTel observed timestamp when needed (#166) When receiving an OTel log, sometimes the timestamp might not be available as explained here. Currently, in those cases, the logs are recorded with the timestamp set to the unix epoch start time, which has already caused issues to our customers since they can't find their logs in Kibana within the right time range. These changes aim to check when the timestamp is not set so that the "observed timestamp" is used instead. --- input/otlp/logs.go | 6 ++- input/otlp/logs_test.go | 100 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/input/otlp/logs.go b/input/otlp/logs.go index 9538a6ba..29de04ba 100644 --- a/input/otlp/logs.go +++ b/input/otlp/logs.go @@ -118,7 +118,11 @@ func (c *Consumer) convertLogRecord( ) *modelpb.APMEvent { event := baseEvent.CloneVT() initEventLabels(event) - event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) + if record.Timestamp() == 0 { + event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta)) + } else { + event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) + } if event.Event == nil { event.Event = modelpb.EventFromVTPool() } diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index f12fc6e5..43844e46 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -467,6 +467,106 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) { assert.Equal(t, "MyEvent", processed[0].Event.Action) } +func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(timestamp) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) + + assert.Len(t, processed, 1) + assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + +func TestConsumerConsumeOTelLogsWithoutTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(0)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(0) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) + + assert.Len(t, processed, 1) + assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + +func TestConsumerConsumeOTelLogsWithObservedTimestampWithoutTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + observedTimestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(0) + record1.SetObservedTimestamp(observedTimestamp) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) + + assert.Len(t, processed, 1) + assert.Equal(t, int(observedTimestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + func TestConsumerConsumeLogsLabels(t *testing.T) { logs := plog.NewLogs() resourceLogs := logs.ResourceLogs().AppendEmpty()