Skip to content

Commit

Permalink
Setting event timestamp from OTel observed timestamp when needed (#166)
Browse files Browse the repository at this point in the history
When receiving an OTel log, sometimes the timestamp might not be available as explained here. Currently, in those cases, the logs are recorded with the timestamp set to the unix epoch start time, which has already caused issues to our customers since they can't find their logs in Kibana within the right time range.

These changes aim to check when the timestamp is not set so that the "observed timestamp" is used instead.
  • Loading branch information
LikeTheSalad authored Oct 19, 2023
1 parent a0f7950 commit 82201b9
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 1 deletion.
6 changes: 5 additions & 1 deletion input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ func (c *Consumer) convertLogRecord(
) *modelpb.APMEvent {
event := baseEvent.CloneVT()
initEventLabels(event)
event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta))
if record.Timestamp() == 0 {
event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta))
} else {
event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta))
}
if event.Event == nil {
event.Event = modelpb.EventFromVTPool()
}
Expand Down
100 changes: 100 additions & 0 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,106 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) {
assert.Equal(t, "MyEvent", processed[0].Event.Action)
}

func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes()
resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java")
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000))

record1 := newLogRecord("") // no log body
record1.SetTimestamp(timestamp)

record1.CopyTo(scopeLogs.LogRecords().AppendEmpty())

var processed modelpb.Batch
var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error {
if processed != nil {
panic("already processes batch")
}
processed = batch.Clone()
return nil
}
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

assert.Len(t, processed, 1)
assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp))
}

func TestConsumerConsumeOTelLogsWithoutTimestamp(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes()
resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java")
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(0))

record1 := newLogRecord("") // no log body
record1.SetTimestamp(0)

record1.CopyTo(scopeLogs.LogRecords().AppendEmpty())

var processed modelpb.Batch
var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error {
if processed != nil {
panic("already processes batch")
}
processed = batch.Clone()
return nil
}
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

assert.Len(t, processed, 1)
assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp))
}

func TestConsumerConsumeOTelLogsWithObservedTimestampWithoutTimestamp(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes()
resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java")
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
observedTimestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000))

record1 := newLogRecord("") // no log body
record1.SetTimestamp(0)
record1.SetObservedTimestamp(observedTimestamp)

record1.CopyTo(scopeLogs.LogRecords().AppendEmpty())

var processed modelpb.Batch
var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error {
if processed != nil {
panic("already processes batch")
}
processed = batch.Clone()
return nil
}
consumer := otlp.NewConsumer(otlp.ConsumerConfig{
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

assert.Len(t, processed, 1)
assert.Equal(t, int(observedTimestamp.AsTime().UnixNano()), int(processed[0].Timestamp))
}

func TestConsumerConsumeLogsLabels(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
Expand Down

0 comments on commit 82201b9

Please sign in to comment.