Skip to content

Commit

Permalink
add otlp event collector
Browse files Browse the repository at this point in the history
  • Loading branch information
endorama committed Jan 26, 2024
1 parent fc55cbd commit ff50195
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
88 changes: 88 additions & 0 deletions internal/loadgen/eventhandler/otlp-collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package eventhandler

import (
"bytes"
"fmt"
"strconv"
"time"

"github.com/tidwall/gjson"
)

type OTLPEventCollector struct{}

// Filter skips processing RUM related events.
func (a *OTLPEventCollector) Filter(line []byte) error {
if bytes.HasPrefix(line, rumMetaHeader) {
return fmt.Errorf("rum data support not implemented")
}

return nil
}

// IsMeta identifies metadata lines from APM protocol.
func (a *OTLPEventCollector) IsMeta(line []byte) bool {
return bytes.HasPrefix(line, metaHeader)
}

// Process processes single lines extracting APM events.
// It uniforms events timestamp.
func (a *OTLPEventCollector) Process(linecopy []byte) event {
event := event{payload: linecopy}
result := gjson.ParseBytes(linecopy)

result.ForEach(func(key, value gjson.Result) bool {
event.objectType = key.Str // lines look like {"span":{...}}

switch event.objectType {
case "resourceLogs":
// compute minimum timestamp from all resource logs
value.Get("#.scopeLogs.#.logRecords").
ForEach(func(key, value gjson.Result) bool {
value.ForEach(func(key, value gjson.Result) bool {
value.ForEach(func(key, value gjson.Result) bool {
s, err := strconv.ParseInt(value.Get("timeUnixNano").String(), 10, 64)
if err != nil {
return true
}
t := time.Unix(0, s)
if event.timestamp.IsZero() || t.Before(event.timestamp) {
event.timestamp = t
}
return true
})
return true
})
return true
})
}

// timestampResult := value.Get("timestamp")
// if timestampResult.Exists() {
// switch timestampResult.Type {
// case gjson.Number:
// us := timestampResult.Int()
// if us >= 0 {
// s := us / 1000000
// ns := (us - (s * 1000000)) * 1000
// event.timestamp = time.Unix(s, ns)
// }
// case gjson.String:
// tstr := timestampResult.Str
// for _, f := range supportedTSFormats {
// if t, err := time.Parse(f, tstr); err == nil {
// event.timestamp = t
// break
// }
// }
// }
// }
return true
})

return event
}
29 changes: 29 additions & 0 deletions internal/loadgen/eventhandler/otlp-collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package eventhandler

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestOTLPEventCollector_Process(t *testing.T) {
o := OTLPEventCollector{}

minTimestamp := "1581452773000000789"
s, err := strconv.ParseInt(minTimestamp, 10, 64)
assert.NoError(t, err)
et := time.Unix(0, s)

line := `{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1581452773000009875","severityNumber":9,"severityText":"Info","name":"logA","body":{"stringValue":"This is a log message"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":"1"}}],"droppedAttributesCount":1,"traceId":"08040201000000000000000000000000","spanId":"0102040800000000"},{"timeUnixNano":"1581452773000000789","severityNumber":9,"severityText":"Info","name":"logB","body":{"stringValue":"something happened"},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":"","spanId":""}]}]}]}`

event := o.Process([]byte(line))

assert.Equal(t, "resourceLogs", event.objectType)
assert.Equal(t, et, event.timestamp)
}

0 comments on commit ff50195

Please sign in to comment.