diff --git a/.chloggen/prwreceiver-parselabels.yaml b/.chloggen/prwreceiver-parselabels.yaml new file mode 100644 index 000000000000..3cd2cad7bb3a --- /dev/null +++ b/.chloggen/prwreceiver-parselabels.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Parse labels from Prometheus Remote Write requests into Resource and Scope Attributes + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35656] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api, user] \ No newline at end of file diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 72f0aa1a4d76..581802d71e25 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -5,6 +5,7 @@ go 1.22.0 require ( github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.112.0 @@ -54,6 +55,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -103,3 +105,9 @@ require ( k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 61195af2fa3c..99508ea64608 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -14,11 +14,13 @@ import ( "github.com/gogo/protobuf/proto" promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" promremote "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap/zapcore" @@ -26,9 +28,10 @@ import ( func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { return &prometheusRemoteWriteReceiver{ - settings: settings, - nextConsumer: nextConsumer, - config: cfg, + settings: settings, + nextConsumer: nextConsumer, + config: cfg, + jobInstanceCache: make(map[string]pmetric.ResourceMetrics), server: &http.Server{ ReadTimeout: 60 * time.Second, }, @@ -39,8 +42,9 @@ type prometheusRemoteWriteReceiver struct { settings receiver.Settings nextConsumer consumer.Metrics - config *Config - server *http.Server + jobInstanceCache map[string]pmetric.ResourceMetrics + config *Config + server *http.Server } func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host component.Host) error { @@ -150,8 +154,105 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco } // translateV2 translates a v2 remote-write request into OTLP metrics. -// For now translateV2 is not implemented and returns an empty metrics. +// translate is not feature complete. // nolint -func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { - return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil +func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { + var ( + badRequestErrors []error + otelMetrics = pmetric.NewMetrics() + b = labels.NewScratchBuilder(0) + stats = promremote.WriteResponseStats{} + ) + + for _, ts := range req.Timeseries { + ls := ts.ToLabels(&b, req.Symbols) + + if !ls.Has(labels.MetricName) { + badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels")) + continue + } else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate { + badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel)) + continue + } + + var rm pmetric.ResourceMetrics + // This cache should be populated by the metric 'target_info', but we're not handling it yet. + cacheEntry, ok := prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] + if ok { + rm = pmetric.NewResourceMetrics() + cacheEntry.CopyTo(rm) + } else { + // A remote-write request can have multiple timeseries with the same instance and job labels. + // While they are different timeseries in Prometheus, we're handling it as the same OTLP metric + // until we support 'target_info'. + rm = otelMetrics.ResourceMetrics().AppendEmpty() + parseJobAndInstance(rm.Resource().Attributes(), ls.Get("instance"), ls.Get("job")) + prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] = rm + } + + switch ts.Metadata.Type { + case writev2.Metadata_METRIC_TYPE_COUNTER: + addCounterDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_GAUGE: + addGaugeDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_SUMMARY: + addSummaryDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_HISTOGRAM: + addHistogramDatapoints(rm, ls, ts) + default: + badRequestErrors = append(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName))) + } + } + + return otelMetrics, stats, errors.Join(badRequestErrors...) +} + +// parseJobAndInstance turns the job and instance labels service resource attributes. +// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/ +func parseJobAndInstance(dest pcommon.Map, instance, job string) { + if job != "" { + dest.PutStr("service.namespace", job) + } + if instance != "" { + parts := strings.Split(instance, "/") + if len(parts) == 2 { + dest.PutStr("service.name", parts[0]) + dest.PutStr("service.instance.id", parts[1]) + return + } + dest.PutStr("service.name", instance) + } +} + +func addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) { + m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge() + addDatapoints(m.DataPoints(), ls, ts) +} + +func addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +func addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +// addDatapoints adds the labels to the datapoints attributes. +// TODO: We're still not handling several fields that make a datapoint complete, e.g. StartTimestamp, +// Timestamp, Value, etc. +func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, _ writev2.TimeSeries) { + attributes := datapoints.AppendEmpty().Attributes() + + for _, l := range ls { + if l.Name == "instance" || l.Name == "job" || // Become resource attributes "service.name", "service.instance.id" and "service.namespace" + l.Name == labels.MetricName || // Becomes metric name + l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version + continue + } + attributes.PutStr(l.Name, l.Value) + } } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 8c5c9e659cfc..649d58e93fb2 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -14,13 +14,40 @@ import ( "github.com/golang/snappy" promconfig "github.com/prometheus/prometheus/config" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func setupServer(t *testing.T) { +var ( + writeV2RequestFixture = &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + }, + } +) + +func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { t.Helper() factory := NewFactory() @@ -30,6 +57,13 @@ func setupServer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, prwReceiver, "metrics receiver creation failed") + return prwReceiver.(*prometheusRemoteWriteReceiver) +} + +func setupServer(t *testing.T) { + t.Helper() + + prwReceiver := setupMetricsReceiver(t) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -98,3 +132,82 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { }) } } + +func TestTranslateV2(t *testing.T) { + prwReceiver := setupMetricsReceiver(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + for _, tc := range []struct { + name string + request *writev2.Request + expectError string + expectedMetrics pmetric.Metrics + expectedStats remote.WriteResponseStats + }{ + { + name: "missing metric name", + request: &writev2.Request{ + Symbols: []string{"", "foo", "bar"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: "missing metric name in labels", + }, + { + name: "duplicate label", + request: &writev2.Request{ + Symbols: []string{"", "__name__", "test"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: `duplicate label "__name__" in labels`, + }, + { + name: "valid request", + request: writeV2RequestFixture, + expectedMetrics: func() pmetric.Metrics { + expected := pmetric.NewMetrics() + rm1 := expected.ResourceMetrics().AppendEmpty() + rmAttributes1 := rm1.Resource().Attributes() + rmAttributes1.PutStr("service.namespace", "test") + rmAttributes1.PutStr("service.name", "service-x") + rmAttributes1.PutStr("service.instance.id", "107cn001") + mAttributes1 := rm1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + mAttributes1.PutStr("d", "e") + mAttributes1.PutStr("foo", "bar") + + rm2 := expected.ResourceMetrics().AppendEmpty() + rmAttributes2 := rm2.Resource().Attributes() + rmAttributes2.PutStr("service.namespace", "foo") + rmAttributes2.PutStr("service.name", "bar") + mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + mAttributes2.PutStr("d", "e") + mAttributes2.PutStr("foo", "bar") + + return expected + }(), + expectedStats: remote.WriteResponseStats{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + metrics, stats, err := prwReceiver.translateV2(ctx, tc.request) + if tc.expectError != "" { + assert.ErrorContains(t, err, tc.expectError) + return + } + + assert.NoError(t, err) + assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics)) + assert.Equal(t, tc.expectedStats, stats) + }) + } +}