From 2a3fbd0c1aea4a41c7527674c49000d8b200d7be Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 8 Nov 2024 20:17:57 +0530 Subject: [PATCH] [connector/otlpjson]: Do not emit empty batches (#35827) #### Description The connector now does not emit empty batches for invalid otlp payload and throws an error instead. Approach discussed here https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35738#issuecomment-2438627919 #### Link to tracking issue Fixes #35738 and #35739 #### Testing Manual Testing #### Documentation --------- Co-authored-by: Daniel Jaglowski --- .../otlpjson-connector-invalid-otlp.yaml | 27 +++++++++++ connector/otlpjsonconnector/connector_test.go | 46 +++++++++++++++++++ connector/otlpjsonconnector/factory.go | 5 ++ connector/otlpjsonconnector/logs.go | 29 ++++++++---- connector/otlpjsonconnector/metrics.go | 28 +++++++---- connector/otlpjsonconnector/traces.go | 27 +++++++---- 6 files changed, 138 insertions(+), 24 deletions(-) create mode 100644 .chloggen/otlpjson-connector-invalid-otlp.yaml diff --git a/.chloggen/otlpjson-connector-invalid-otlp.yaml b/.chloggen/otlpjson-connector-invalid-otlp.yaml new file mode 100644 index 000000000000..0e5628a3f03f --- /dev/null +++ b/.chloggen/otlpjson-connector-invalid-otlp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: connector/otlpjson + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Throw error on invalid otlp payload. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35738, 35739] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/otlpjsonconnector/connector_test.go b/connector/otlpjsonconnector/connector_test.go index deb5ae281ec0..cd433e75ccb3 100644 --- a/connector/otlpjsonconnector/connector_test.go +++ b/connector/otlpjsonconnector/connector_test.go @@ -178,3 +178,49 @@ func TestLogsToTraces(t *testing.T) { }) } } + +// This benchmark looks at how performance is affected when all three connectors are consuming logs (at the same time) +func BenchmarkConsumeLogs(b *testing.B) { + inputlogs := "input-log.yaml" + inputTraces := "input-trace.yaml" + inputMetrics := "input-metric.yaml" + + factory := NewFactory() + // initialize log -> log connector + logsink := &consumertest.LogsSink{} + logscon, _ := factory.CreateLogsToLogs(context.Background(), + connectortest.NewNopSettings(), createDefaultConfig(), logsink) + + require.NoError(b, logscon.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(b, logscon.Shutdown(context.Background())) + }() + + // initialize log -> traces connector + tracesink := &consumertest.TracesSink{} + traceconn, _ := factory.CreateLogsToTraces(context.Background(), + connectortest.NewNopSettings(), createDefaultConfig(), tracesink) + require.NoError(b, traceconn.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(b, traceconn.Shutdown(context.Background())) + }() + + // initialize log -> metric connector + metricsink := &consumertest.MetricsSink{} + metricconn, _ := factory.CreateLogsToMetrics(context.Background(), + connectortest.NewNopSettings(), createDefaultConfig(), metricsink) + require.NoError(b, metricconn.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(b, metricconn.Shutdown(context.Background())) + }() + + testLogs, _ := golden.ReadLogs(filepath.Join("testdata", "logsToLogs", inputlogs)) + testTraces, _ := golden.ReadLogs(filepath.Join("testdata", "logsToTraces", inputTraces)) + testMetrics, _ := golden.ReadLogs(filepath.Join("testdata", "logsToMetrics", inputMetrics)) + + for i := 0; i < b.N; i++ { + assert.NoError(b, logscon.ConsumeLogs(context.Background(), testLogs)) + assert.NoError(b, traceconn.ConsumeLogs(context.Background(), testTraces)) + assert.NoError(b, metricconn.ConsumeLogs(context.Background(), testMetrics)) + } +} diff --git a/connector/otlpjsonconnector/factory.go b/connector/otlpjsonconnector/factory.go index 1f4dca456bac..386b765b0e34 100644 --- a/connector/otlpjsonconnector/factory.go +++ b/connector/otlpjsonconnector/factory.go @@ -5,6 +5,7 @@ package otlpjsonconnector // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "regexp" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" @@ -13,6 +14,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/connector/otlpjsonconnector/internal/metadata" ) +var logRegex = regexp.MustCompile(`^\{\s*"resourceLogs"\s*:\s*\[`) +var metricRegex = regexp.MustCompile(`^\{\s*"resourceMetrics"\s*:\s*\[`) +var traceRegex = regexp.MustCompile(`^\{\s*"resourceSpans"\s*:\s*\[`) + // NewFactory returns a ConnectorFactory. func NewFactory() connector.Factory { return connector.NewFactory( diff --git a/connector/otlpjsonconnector/logs.go b/connector/otlpjsonconnector/logs.go index 621bc1c16a49..0cb524956855 100644 --- a/connector/otlpjsonconnector/logs.go +++ b/connector/otlpjsonconnector/logs.go @@ -50,16 +50,29 @@ func (c *connectorLogs) ConsumeLogs(ctx context.Context, pl plog.Logs) error { for k := 0; k < logRecord.LogRecords().Len(); k++ { lRecord := logRecord.LogRecords().At(k) token := lRecord.Body() - var l plog.Logs - l, err := logsUnmarshaler.UnmarshalLogs([]byte(token.AsString())) - if err != nil { - c.logger.Error("could not extract logs from otlp json", zap.Error(err)) + + // Check if the "resourceLogs" key exists in the JSON data + value := token.AsString() + switch { + case logRegex.MatchString(value): + var l plog.Logs + l, err := logsUnmarshaler.UnmarshalLogs([]byte(value)) + if err != nil { + c.logger.Error("could not extract logs from otlp json", zap.Error(err)) + continue + } + err = c.logsConsumer.ConsumeLogs(ctx, l) + if err != nil { + c.logger.Error("could not consume logs from otlp json", zap.Error(err)) + } + case metricRegex.MatchString(value), traceRegex.MatchString(value): + // If it's a metric or trace payload, simply continue continue + default: + // If no regex matches, log the invalid payload + c.logger.Error("Invalid otlp payload") } - err = c.logsConsumer.ConsumeLogs(ctx, l) - if err != nil { - c.logger.Error("could not consume logs from otlp json", zap.Error(err)) - } + } } } diff --git a/connector/otlpjsonconnector/metrics.go b/connector/otlpjsonconnector/metrics.go index 3954e214512c..fd8fa9ff6c23 100644 --- a/connector/otlpjsonconnector/metrics.go +++ b/connector/otlpjsonconnector/metrics.go @@ -51,16 +51,28 @@ func (c *connectorMetrics) ConsumeLogs(ctx context.Context, pl plog.Logs) error for k := 0; k < logRecord.LogRecords().Len(); k++ { lRecord := logRecord.LogRecords().At(k) token := lRecord.Body() - var m pmetric.Metrics - m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(token.AsString())) - if err != nil { - c.logger.Error("could extract metrics from otlp json", zap.Error(err)) + + value := token.AsString() + switch { + case metricRegex.MatchString(value): + var m pmetric.Metrics + m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(value)) + if err != nil { + c.logger.Error("could not extract metrics from otlp json", zap.Error(err)) + continue + } + err = c.metricsConsumer.ConsumeMetrics(ctx, m) + if err != nil { + c.logger.Error("could not consume metrics from otlp json", zap.Error(err)) + } + case logRegex.MatchString(value), traceRegex.MatchString(value): + // If it's a log or trace payload, simply continue continue + default: + // If no regex matches, log the invalid payload + c.logger.Error("Invalid otlp payload") } - err = c.metricsConsumer.ConsumeMetrics(ctx, m) - if err != nil { - c.logger.Error("could not consume metrics from otlp json", zap.Error(err)) - } + } } } diff --git a/connector/otlpjsonconnector/traces.go b/connector/otlpjsonconnector/traces.go index 6210095f0912..2b1e4b1f7bc9 100644 --- a/connector/otlpjsonconnector/traces.go +++ b/connector/otlpjsonconnector/traces.go @@ -51,15 +51,26 @@ func (c *connectorTraces) ConsumeLogs(ctx context.Context, pl plog.Logs) error { for k := 0; k < logRecord.LogRecords().Len(); k++ { lRecord := logRecord.LogRecords().At(k) token := lRecord.Body() - var t ptrace.Traces - t, err := tracesUnmarshaler.UnmarshalTraces([]byte(token.AsString())) - if err != nil { - c.logger.Error("could extract traces from otlp json", zap.Error(err)) + + value := token.AsString() + switch { + case traceRegex.MatchString(value): + var t ptrace.Traces + t, err := tracesUnmarshaler.UnmarshalTraces([]byte(value)) + if err != nil { + c.logger.Error("could not extract traces from otlp json", zap.Error(err)) + continue + } + err = c.tracesConsumer.ConsumeTraces(ctx, t) + if err != nil { + c.logger.Error("could not consume traces from otlp json", zap.Error(err)) + } + case metricRegex.MatchString(value), logRegex.MatchString(value): + // If it's a metric or log payload, continue to the next iteration continue - } - err = c.tracesConsumer.ConsumeTraces(ctx, t) - if err != nil { - c.logger.Error("could not consume traces from otlp json", zap.Error(err)) + default: + // If no regex matches, log the invalid payload + c.logger.Error("Invalid otlp payload") } } }