From 6b0a99366a0243e03f129a162ae46bf1b5710c65 Mon Sep 17 00:00:00 2001 From: Edoardo Tenani Date: Fri, 26 Jan 2024 20:04:36 +0100 Subject: [PATCH] add otlp event collector --- .../loadgen/eventhandler/otlp-collector.go | 88 +++++++++++++++++++ .../eventhandler/otlp-collector_test.go | 29 ++++++ 2 files changed, 117 insertions(+) create mode 100644 internal/loadgen/eventhandler/otlp-collector.go create mode 100644 internal/loadgen/eventhandler/otlp-collector_test.go diff --git a/internal/loadgen/eventhandler/otlp-collector.go b/internal/loadgen/eventhandler/otlp-collector.go new file mode 100644 index 0000000..9bf4dc8 --- /dev/null +++ b/internal/loadgen/eventhandler/otlp-collector.go @@ -0,0 +1,88 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package eventhandler + +import ( + "bytes" + "fmt" + "strconv" + "time" + + "github.com/tidwall/gjson" +) + +type OTLPEventCollector struct{} + +// Filter skips processing RUM related events. +func (a *OTLPEventCollector) Filter(line []byte) error { + if bytes.HasPrefix(line, rumMetaHeader) { + return fmt.Errorf("rum data support not implemented") + } + + return nil +} + +// IsMeta identifies metadata lines from APM protocol. +func (a *OTLPEventCollector) IsMeta(line []byte) bool { + return bytes.HasPrefix(line, metaHeader) +} + +// Process processes single lines extracting APM events. +// It uniforms events timestamp. +func (a *OTLPEventCollector) Process(linecopy []byte) event { + event := event{payload: linecopy} + result := gjson.ParseBytes(linecopy) + + result.ForEach(func(key, value gjson.Result) bool { + event.objectType = key.Str // lines look like {"span":{...}} + + switch event.objectType { + case "resourceLogs": + // compute minimum timestamp from all resource logs + value.Get("#.scopeLogs.#.logRecords"). + ForEach(func(key, value gjson.Result) bool { + value.ForEach(func(key, value gjson.Result) bool { + value.ForEach(func(key, value gjson.Result) bool { + s, err := strconv.ParseInt(value.Get("timeUnixNano").String(), 10, 64) + if err != nil { + return true + } + t := time.Unix(0, s) + if event.timestamp.IsZero() || t.Before(event.timestamp) { + event.timestamp = t + } + return true + }) + return true + }) + return true + }) + } + + // timestampResult := value.Get("timestamp") + // if timestampResult.Exists() { + // switch timestampResult.Type { + // case gjson.Number: + // us := timestampResult.Int() + // if us >= 0 { + // s := us / 1000000 + // ns := (us - (s * 1000000)) * 1000 + // event.timestamp = time.Unix(s, ns) + // } + // case gjson.String: + // tstr := timestampResult.Str + // for _, f := range supportedTSFormats { + // if t, err := time.Parse(f, tstr); err == nil { + // event.timestamp = t + // break + // } + // } + // } + // } + return true + }) + + return event +} diff --git a/internal/loadgen/eventhandler/otlp-collector_test.go b/internal/loadgen/eventhandler/otlp-collector_test.go new file mode 100644 index 0000000..9d4e756 --- /dev/null +++ b/internal/loadgen/eventhandler/otlp-collector_test.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package eventhandler + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOTLPEventCollector_Process(t *testing.T) { + o := OTLPEventCollector{} + + minTimestamp := "1581452773000000789" + s, err := strconv.ParseInt(minTimestamp, 10, 64) + assert.NoError(t, err) + et := time.Unix(0, s) + + line := `{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1581452773000009875","severityNumber":9,"severityText":"Info","name":"logA","body":{"stringValue":"This is a log message"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":"1"}}],"droppedAttributesCount":1,"traceId":"08040201000000000000000000000000","spanId":"0102040800000000"},{"timeUnixNano":"1581452773000000789","severityNumber":9,"severityText":"Info","name":"logB","body":{"stringValue":"something happened"},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":"","spanId":""}]}]}]}` + + event := o.Process([]byte(line)) + + assert.Equal(t, "resourceLogs", event.objectType) + assert.Equal(t, et, event.timestamp) +}