diff --git a/.chloggen/prwreceiver-bodyunmarshal.yaml b/.chloggen/prwreceiver-bodyunmarshal.yaml new file mode 100644 index 000000000000..62718c89341e --- /dev/null +++ b/.chloggen/prwreceiver-bodyunmarshal.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement body unmarshaling for Prometheus Remote Write requests + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Warning - The HTTP Server still doesn't do anything. It's just a placeholder for now. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api, user] diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index dfab976db1a0..4af47e71d777 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet go 1.22.0 require ( + github.com/gogo/protobuf v1.3.2 + github.com/golang/snappy v0.0.4 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.0 @@ -10,6 +12,7 @@ require ( go.opentelemetry.io/collector/config/confighttp v0.111.0 go.opentelemetry.io/collector/consumer v0.111.0 go.opentelemetry.io/collector/consumer/consumertest v0.111.0 + go.opentelemetry.io/collector/pdata v1.17.0 go.opentelemetry.io/collector/receiver v0.111.0 go.uber.org/zap v1.27.0 ) @@ -24,15 +27,14 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dennwc/varint v1.0.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect @@ -63,16 +65,17 @@ require ( go.opentelemetry.io/collector/extension v0.111.0 // indirect go.opentelemetry.io/collector/extension/auth v0.111.0 // indirect go.opentelemetry.io/collector/internal/globalsignal v0.111.0 // indirect - go.opentelemetry.io/collector/pdata v1.17.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.111.0 // indirect go.opentelemetry.io/collector/pipeline v0.111.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect + go.opentelemetry.io/collector/semconv v0.105.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect go.opentelemetry.io/otel v1.30.0 // indirect go.opentelemetry.io/otel/metric v1.30.0 // indirect go.opentelemetry.io/otel/sdk v1.30.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect go.opentelemetry.io/otel/trace v1.30.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/net v0.29.0 // indirect diff --git a/receiver/prometheusremotewritereceiver/go.sum b/receiver/prometheusremotewritereceiver/go.sum index bb5c6f980144..6d353623fd1d 100644 --- a/receiver/prometheusremotewritereceiver/go.sum +++ b/receiver/prometheusremotewritereceiver/go.sum @@ -61,6 +61,8 @@ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.54.19 h1:tyWV+07jagrNiCcGRzRhdtVjQs7Vy41NwsuOcl0IbVI= github.com/aws/aws-sdk-go v1.54.19/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= +github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -80,6 +82,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= +github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= github.com/digitalocean/godo v1.118.0 h1:lkzGFQmACrVCp7UqH1sAi4JK/PWwlc5aaxubgorKmC4= github.com/digitalocean/godo v1.118.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= @@ -90,6 +94,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= +github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -100,6 +106,8 @@ github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnv github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= +github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM= +github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -306,6 +314,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= @@ -433,6 +443,8 @@ go.opentelemetry.io/collector/receiver v0.111.0 h1:6cRHZ9cUxYfRPkArUCkIhoo7Byf6t go.opentelemetry.io/collector/receiver v0.111.0/go.mod h1:QSl/n9ikDP+6n39QcRY/VLjwQI0qbT1RQp512uBQl3g= go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 h1:oYLAdGMQQR7gB6wVkbV0G4EMsrmiOs3O0qf3hh/3avw= go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0/go.mod h1:M/OfdEGnvyB+fSTSW4RPKj5N06FXL8oKSIf60FlrKmM= +go.opentelemetry.io/collector/semconv v0.105.0 h1:8p6dZ3JfxFTjbY38d8xlQGB1TQ3nPUvs+D0RERniZ1g= +go.opentelemetry.io/collector/semconv v0.105.0/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 13d3f724f1bb..6bbdd267b994 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -7,14 +7,19 @@ import ( "context" "errors" "fmt" + "io" "net/http" "strings" "time" + "github.com/gogo/protobuf/proto" promConfig "github.com/prometheus/prometheus/config" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + promRemote "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap/zapcore" ) @@ -89,6 +94,30 @@ func (prw *prometheusRemoteWriteReceiver) handlePRW(w http.ResponseWriter, req * // After parsing the content-type header, the next step would be to handle content-encoding. // Luckly confighttp's Server has middleware that already decompress the request body for us. + + body, err := io.ReadAll(req.Body) + if err != nil { + prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var prw2Req writev2.Request + if err = proto.Unmarshal(body, &prw2Req); err != nil { + prw.settings.Logger.Warn("Error decoding remote write request", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + _, stats, errHTTPCode, err := prw.translateV2(req.Context(), &prw2Req) + stats.SetHeaders(w) + if err != nil { + // TODO: what to do with 5xx errors? When would they happen? + http.Error(w, err.Error(), errHTTPCode) + return + } + + w.WriteHeader(http.StatusNoContent) } // parseProto parses the content-type header and returns the version of the remote-write protocol. @@ -120,3 +149,10 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promCo // No "proto=" parameter found, assume v1. return promConfig.RemoteWriteProtoMsgV1, nil } + +// translateV2 translates a v2 remote-write request into OTLP metrics. +// For now translateV2 is not implemented and returns an empty metrics. +// nolint +func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promRemote.WriteResponseStats, int, error) { + return pmetric.NewMetrics(), promRemote.WriteResponseStats{}, 0, nil +} diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 63e89b3c835d..f31bf5315c4d 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -4,19 +4,25 @@ package prometheusremotewritereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver" import ( + "bytes" "context" "fmt" "net/http" "testing" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" promConfig "github.com/prometheus/prometheus/config" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" ) -func TestHandlePRWContentTypeNegotiation(t *testing.T) { +func setupServer(t *testing.T) { + t.Helper() + factory := NewFactory() cfg := factory.CreateDefaultConfig() @@ -31,6 +37,10 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { t.Cleanup(func() { assert.NoError(t, prwReceiver.Shutdown(ctx), "Must not error shutting down") }) +} + +func TestHandlePRWContentTypeNegotiation(t *testing.T) { + setupServer(t) for _, tc := range []struct { name string @@ -60,18 +70,31 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { { name: "x-protobuf/v2 proto parameter", contentType: fmt.Sprintf("application/x-protobuf;proto=%s", promConfig.RemoteWriteProtoMsgV2), - extectedCode: http.StatusOK, + extectedCode: http.StatusNoContent, }, } { t.Run(tc.name, func(t *testing.T) { - req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", nil) + body := writev2.Request{} + pBuf := proto.NewBuffer(nil) + err := pBuf.Marshal(&body) + assert.NoError(t, err) + + var compressedBody []byte + snappy.Encode(compressedBody, pBuf.Bytes()) + req, err := http.NewRequest(http.MethodPost, "http://localhost:9090/api/v1/write", bytes.NewBuffer(compressedBody)) assert.NoError(t, err) req.Header.Set("Content-Type", tc.contentType) + req.Header.Set("Content-Encoding", "snappy") resp, err := http.DefaultClient.Do(req) assert.NoError(t, err) assert.Equal(t, tc.extectedCode, resp.StatusCode) + if tc.extectedCode == http.StatusNoContent { // We went until the end + assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Samples-Written")) + assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Histograms-Written")) + assert.NotEmpty(t, resp.Header.Get("X-Prometheus-Remote-Write-Exemplars-Written")) + } }) } }