From 0b4d39da6ae405436b7c33cfb51cec232709c17c Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Tue, 15 Oct 2024 09:11:48 +0200 Subject: [PATCH 1/7] feat: add `pyroscope.receive_http` component for profile ingestion and forwarding --- CHANGELOG.md | 1 + .../sources/reference/compatibility/_index.md | 1 + .../pyroscope/pyroscope.receive_http.md | 119 ++++++++ internal/component/all/all.go | 1 + internal/component/pyroscope/appender.go | 26 +- .../pyroscope/receive_http/receive_http.go | 182 +++++++++++ .../receive_http/receive_http_test.go | 289 ++++++++++++++++++ .../pyroscope/scrape/delta_profiles.go | 5 + internal/component/pyroscope/write/write.go | 78 ++++- 9 files changed, 688 insertions(+), 14 deletions(-) create mode 100644 docs/sources/reference/components/pyroscope/pyroscope.receive_http.md create mode 100644 internal/component/pyroscope/receive_http/receive_http.go create mode 100644 internal/component/pyroscope/receive_http/receive_http_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cb5aacf64b..a930075ea0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Main (unreleased) ### Features - Add the function `path_join` to the stdlib. (@wildum) +- Add `pyroscope.receive_http` component to receive and forward Pyroscope profiles (@marcsanmi) - Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 8a2d1e7f9b..4c22372daa 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -394,6 +394,7 @@ The following components, grouped by namespace, _consume_ Pyroscope `ProfilesRec {{< collapse title="pyroscope" >}} - [pyroscope.ebpf](../components/pyroscope/pyroscope.ebpf) - [pyroscope.java](../components/pyroscope/pyroscope.java) +- [pyroscope.receive_http](../components/pyroscope/pyroscope.receive_http) - [pyroscope.scrape](../components/pyroscope/pyroscope.scrape) {{< /collapse >}} diff --git a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md new file mode 100644 index 0000000000..a1f73a1df7 --- /dev/null +++ b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md @@ -0,0 +1,119 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/pyroscope/pyroscope.receive_http/ +aliases: + - ../pyroscope.receive_http/ # /docs/alloy/latest/reference/components/pyroscope.receive_http/ +description: Learn about pyroscope.receive_http +title: pyroscope.receive_http +--- + +# pyroscope.receive_http + +`pyroscope.receive_http` listens for HTTP requests containing profiles and forwards them to other components capable of receiving profiles. + +The HTTP API exposed is compatible with Pyroscope's [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/). This allows `pyroscope.receive_http to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data. + +## Usage + +```alloy +pyroscope.receive_http "LABEL" { + http { + listen_address = "LISTEN_ADDRESS" + listen_port = PORT + } + forward_to = RECEIVER_LIST +} +``` +The component will start an HTTP server supporting the following endpoint: + +`POST /ingest` - send profiles to the component, which in turn will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match that of Pyroscope's ingest API. + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +------------------|---------------|-------------------------------------------------|---------|--------- +`forward_to` | `list(ProfilesReceiver)` | List of receivers to send profiles to. | | yes + +## Blocks + +The following blocks are supported inside the definition of `pyroscope.receive_http`: + +Hierarchy | Name | Description | Required +----------|------|----------------------------------------------------|--------- +`http` | `http` | Configures the HTTP server that receives requests. | no + +### http + +The `http` block configures the HTTP server. + +You can use the following arguments to configure the `http` block. Any omitted fields take their default values. + +Name | Type | Description | Default | Required +-----------------------|------------|------------------------------------------------------------------------------------------------------------------|----------|--------- +`conn_limit` | `int` | Maximum number of simultaneous HTTP connections. Defaults to no limit. | `0` | no +`listen_address` | `string` | Network address on which the server listens for new connections. Defaults to accepting all incoming connections. | `""` | no +`listen_port` | `int` | Port number on which the server listens for new connections. | `8080` | no +`server_idle_timeout` | `duration` | Idle timeout for HTTP server. | `"120s"` | no +`server_read_timeout` | `duration` | Read timeout for HTTP server. | `"30s"` | no +`server_write_timeout` | `duration` | Write timeout for HTTP server. | `"30s"` | no + +## Exported fields + +`pyroscope.receive_http` does not export any fields. + +## Component health + +`pyroscope.receive_http` is reported as unhealthy if it is given an invalid configuration. + +## Example +This example creates a `pyroscope.receive_http` component which starts an HTTP server listening on `0.0.0.0` and port `9999`. The server receives profiles and forwards them to multiple `pyroscope.write` components, which write these profiles to different HTTP endpoints. +```alloy +// Receives profiles over HTTP +pyroscope.receive_http "default" { + http { + listen_address = "0.0.0.0" + listen_port = 9999 + } + forward_to = [pyroscope.write.staging.receiver, pyroscope.write.production.receiver] +} + +// Send profiles to a staging Pyroscope instance +pyroscope.write "staging" { + endpoint { + url = "http://pyroscope-staging:4040" + headers = { + "X-Scope-OrgID" = "squad-1", + } + } +} + +// Send profiles to a production Pyroscope instance +pyroscope.write "production" { + endpoint { + url = "http://pyroscope-production:4040" + headers = { + "X-Scope-OrgID" = "squad-1", + } + } +} +``` + +Note: This example demonstrates forwarding to multiple `pyroscope.write` components. Be aware that this configuration will duplicate the received profiles, sending a copy to each configured `pyroscope.write` component. + +You can also create multiple `pyroscope.receive_http` components with different configurations to listen on different addresses or ports as needed. This flexibility allows you to design a setup that best fits your infrastructure and profile routing requirements. + + + +## Compatible components + +`pyroscope.receive_http` can accept arguments from the following components: + +- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 5778f4c8ce..c7cd6e9148 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -139,6 +139,7 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape _ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf _ "github.com/grafana/alloy/internal/component/pyroscope/java" // Import pyroscope.java + _ "github.com/grafana/alloy/internal/component/pyroscope/receive_http" // Import pyroscope.receive_http _ "github.com/grafana/alloy/internal/component/pyroscope/scrape" // Import pyroscope.scrape _ "github.com/grafana/alloy/internal/component/pyroscope/write" // Import pyroscope.write _ "github.com/grafana/alloy/internal/component/remote/http" // Import remote.http diff --git a/internal/component/pyroscope/appender.go b/internal/component/pyroscope/appender.go index 41c2cca164..7a2a23ebd2 100644 --- a/internal/component/pyroscope/appender.go +++ b/internal/component/pyroscope/appender.go @@ -2,6 +2,9 @@ package pyroscope import ( "context" + "io" + "net/http" + "net/url" "sync" "time" @@ -22,6 +25,7 @@ type Appendable interface { type Appender interface { Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error + AppendIngest(ctx context.Context, profile *IncomingProfile) error } type RawSample struct { @@ -29,6 +33,12 @@ type RawSample struct { RawProfile []byte } +type IncomingProfile struct { + Body io.ReadCloser + Headers http.Header + URL *url.URL +} + var _ Appendable = (*Fanout)(nil) // Fanout supports the default Alloy style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. @@ -118,6 +128,18 @@ func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, sample return f(ctx, labels, samples) } -func (f AppendableFunc) Appender() Appender { - return f +// AppendIngest satisfies the AppenderIngest interface. +func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) error { + now := time.Now() + defer func() { + a.writeLatency.Observe(time.Since(now).Seconds()) + }() + var multiErr error + for _, x := range a.children { + err := x.AppendIngest(ctx, profile) + if err != nil { + multiErr = multierror.Append(multiErr, err) + } + } + return multiErr } diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go new file mode 100644 index 0000000000..5e28a5191a --- /dev/null +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -0,0 +1,182 @@ +package receive_http + +import ( + "context" + "errors" + "fmt" + "github.com/gorilla/mux" + "github.com/grafana/alloy/internal/component/pyroscope/write" + "golang.org/x/sync/errgroup" + "io" + "net/http" + "reflect" + "sync" + + "github.com/grafana/alloy/internal/component" + fnet "github.com/grafana/alloy/internal/component/common/net" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +func init() { + component.Register(component.Registration{ + Name: "pyroscope.receive_http", + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +type Arguments struct { + Server *fnet.ServerConfig `alloy:",squash"` + ForwardTo []pyroscope.Appendable `alloy:"forward_to,attr"` +} + +// SetToDefault implements syntax.Defaulter. +func (a *Arguments) SetToDefault() { + *a = Arguments{ + Server: fnet.DefaultServerConfig(), + } +} + +type Component struct { + opts component.Options + server *fnet.TargetServer + appendables []pyroscope.Appendable + mut sync.Mutex +} + +func New(opts component.Options, args Arguments) (*Component, error) { + c := &Component{ + opts: opts, + appendables: args.ForwardTo, + } + + if err := c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Component) Run(ctx context.Context) error { + defer func() { + c.mut.Lock() + defer c.mut.Unlock() + c.shutdownServer() + }() + + <-ctx.Done() + level.Info(c.opts.Logger).Log("msg", "terminating due to context done") + return nil +} + +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + + c.mut.Lock() + defer c.mut.Unlock() + + c.appendables = newArgs.ForwardTo + + if newArgs.Server == nil { + newArgs.Server = fnet.DefaultServerConfig() + } + if newArgs.Server.HTTP == nil { + newArgs.Server.HTTP = &fnet.HTTPConfig{ + ListenPort: 0, + ListenAddress: "127.0.0.1", + } + } + + serverNeedsRestarting := c.server == nil || !reflect.DeepEqual(c.server, *newArgs.Server.HTTP) + if !serverNeedsRestarting { + return nil + } + + c.shutdownServer() + + srv, err := fnet.NewTargetServer(c.opts.Logger, "pyroscope_receive_http", c.opts.Registerer, newArgs.Server) + if err != nil { + return fmt.Errorf("failed to create server: %w", err) + } + c.server = srv + + return c.server.MountAndRun(func(router *mux.Router) { + router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost) + }) +} + +func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) { + c.mut.Lock() + appendables := c.appendables + c.mut.Unlock() + + // Create a pipe for each appendable + pipeWriters := make([]io.Writer, len(appendables)) + pipeReaders := make([]io.Reader, len(appendables)) + for i := range appendables { + pr, pw := io.Pipe() + pipeReaders[i] = pr + pipeWriters[i] = pw + } + mw := io.MultiWriter(pipeWriters...) + + // Create an errgroup with the timeout context + g, ctx := errgroup.WithContext(r.Context()) + + // Start copying the request body to all pipes + g.Go(func() error { + defer func() { + for _, pw := range pipeWriters { + pw.(io.WriteCloser).Close() + } + }() + _, err := io.Copy(mw, r.Body) + return err + }) + + // Process each appendable + for i, appendable := range appendables { + g.Go(func() error { + defer pipeReaders[i].(io.ReadCloser).Close() + + profile := &pyroscope.IncomingProfile{ + Body: io.NopCloser(pipeReaders[i]), + Headers: r.Header.Clone(), + URL: r.URL, + } + + err := appendable.Appender().AppendIngest(ctx, profile) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "Failed to append profile", "appendable", i, "err", err) + return err + } + level.Debug(c.opts.Logger).Log("msg", "Profile appended successfully", "appendable", i) + return nil + }) + } + + err := g.Wait() + if err != nil { + var writeErr *write.PyroscopeWriteError + if errors.As(err, &writeErr) { + http.Error(w, http.StatusText(writeErr.StatusCode), writeErr.StatusCode) + } else { + level.Error(c.opts.Logger).Log("msg", "Failed to process request", "err", err) + http.Error(w, "Failed to process request", http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusOK) +} + +func (c *Component) shutdownServer() { + if c.server != nil { + c.server.StopAndShutdown() + c.server = nil + } +} diff --git a/internal/component/pyroscope/receive_http/receive_http_test.go b/internal/component/pyroscope/receive_http/receive_http_test.go new file mode 100644 index 0000000000..71929abce6 --- /dev/null +++ b/internal/component/pyroscope/receive_http/receive_http_test.go @@ -0,0 +1,289 @@ +package receive_http + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + "io" + "net/http" + "net/url" + "testing" + "time" + + "github.com/phayes/freeport" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component" + fnet "github.com/grafana/alloy/internal/component/common/net" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/util" +) + +// TestForwardsProfiles verifies the behavior of the pyroscope.receive_http component +// under various scenarios. It tests different profile sizes, HTTP methods, paths, +// query parameters, and error conditions to ensure correct forwarding behavior +// and proper error handling. +func TestForwardsProfiles(t *testing.T) { + tests := []struct { + name string + profileSize int + method string + path string + queryParams string + headers map[string]string + appendableErrors []error + expectedStatus int + expectedForwards int + }{ + { + name: "Small profile", + profileSize: 1024, // 1KB + method: "POST", + path: "/ingest", + queryParams: "name=test_app_1&from=1234567890&until=1234567900", + headers: map[string]string{"Content-Type": "application/octet-stream"}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusOK, + expectedForwards: 2, + }, + { + name: "Large profile with custom headers", + profileSize: 1024 * 1024, // 1MB + method: "POST", + path: "/ingest", + queryParams: "name=test_app_2&from=1234567891&until=1234567901&custom=param1", + headers: map[string]string{"X-Scope-OrgID": "1234"}, + appendableErrors: []error{nil}, + expectedStatus: http.StatusOK, + expectedForwards: 1, + }, + { + name: "Invalid method", + profileSize: 1024, + method: "GET", + path: "/ingest", + queryParams: "name=test_app_3&from=1234567892&until=1234567902", + headers: map[string]string{}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusMethodNotAllowed, + expectedForwards: 0, + }, + { + name: "Invalid query params", + profileSize: 1024, + method: "GET", + path: "/ingest", + queryParams: "test=test_app", + headers: map[string]string{}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusMethodNotAllowed, + expectedForwards: 0, + }, + { + name: "Invalid path", + profileSize: 1024, + method: "POST", + path: "/invalid", + queryParams: "name=test_app_4&from=1234567893&until=1234567903", + headers: map[string]string{"Content-Type": "application/octet-stream"}, + appendableErrors: []error{nil, nil}, + expectedStatus: http.StatusNotFound, + expectedForwards: 0, + }, + { + name: "All appendables fail", + profileSize: 2048, + method: "POST", + path: "/ingest", + queryParams: "name=test_app_5&from=1234567894&until=1234567904&scenario=all_fail", + headers: map[string]string{"Content-Type": "application/octet-stream", "X-Test": "fail-all"}, + appendableErrors: []error{fmt.Errorf("error1"), fmt.Errorf("error2")}, + expectedStatus: http.StatusInternalServerError, + expectedForwards: 2, + }, + { + name: "One appendable fails, one succeeds", + profileSize: 4096, + method: "POST", + path: "/ingest", + queryParams: "name=test_app_6&from=1234567895&until=1234567905&scenario=partial_failure", + headers: map[string]string{"X-Custom-ID": "test-6"}, + appendableErrors: []error{fmt.Errorf("error"), nil}, + expectedStatus: http.StatusInternalServerError, + expectedForwards: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + appendables := createTestAppendables(tt.appendableErrors) + port := startComponent(t, appendables) + + testProfile, resp := sendCustomRequest(t, port, tt.method, tt.path, tt.queryParams, tt.headers, tt.profileSize) + require.Equal(t, tt.expectedStatus, resp.StatusCode) + + forwardedCount := countForwardedProfiles(appendables) + require.Equal(t, tt.expectedForwards, forwardedCount, "Unexpected number of forwards") + + if tt.expectedForwards > 0 { + verifyForwardedProfiles(t, appendables, testProfile, tt.headers, tt.queryParams) + } + }) + } +} + +func createTestAppendables(errors []error) []pyroscope.Appendable { + var appendables []pyroscope.Appendable + for _, err := range errors { + appendables = append(appendables, testAppendable(err)) + } + return appendables +} + +func countForwardedProfiles(appendables []pyroscope.Appendable) int { + count := 0 + for _, app := range appendables { + if testApp, ok := app.(*testAppender); ok && testApp.lastProfile != nil { + count++ + } + } + return count +} + +func verifyForwardedProfiles(t *testing.T, appendables []pyroscope.Appendable, expectedProfile []byte, expectedHeaders map[string]string, expectedQueryParams string) { + for i, app := range appendables { + testApp, ok := app.(*testAppender) + require.True(t, ok, "Appendable is not a testAppender") + + if testApp.lastProfile != nil { + // Verify profile body + body, err := io.ReadAll(testApp.lastProfile.Body) + require.NoError(t, err, "Failed to read profile body for appendable %d", i) + require.Equal(t, expectedProfile, body, "Profile mismatch for appendable %d", i) + + // Verify headers + for key, value := range expectedHeaders { + require.Equal(t, value, testApp.lastProfile.Headers.Get(key), "Header mismatch for key %s in appendable %d", key, i) + } + + // Verify query parameters + expectedParams, err := url.ParseQuery(expectedQueryParams) + require.NoError(t, err, "Failed to parse expected query parameters") + actualParams := testApp.lastProfile.URL.Query() + for key, values := range expectedParams { + require.Equal(t, values, actualParams[key], "Query parameter mismatch for key %s in appendable %d", key, i) + } + } + } +} + +func startComponent(t *testing.T, appendables []pyroscope.Appendable) int { + port, err := freeport.GetFreePort() + require.NoError(t, err) + + args := Arguments{ + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + }, + ForwardTo: appendables, + } + + comp, err := New(testOptions(t), args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, comp.Run(ctx)) + }() + + waitForServerReady(t, port) + return port +} + +func sendCustomRequest(t *testing.T, port int, method, path, queryParams string, headers map[string]string, profileSize int) ([]byte, *http.Response) { + t.Helper() + testProfile := make([]byte, profileSize) + _, err := rand.Read(testProfile) + require.NoError(t, err) + + testURL := fmt.Sprintf("http://localhost:%d%s?%s", port, path, queryParams) + + req, err := http.NewRequest(method, testURL, bytes.NewReader(testProfile)) + require.NoError(t, err) + + for key, value := range headers { + req.Header.Set(key, value) + } + + client := &http.Client{ + Timeout: 5 * time.Second, + } + resp, err := client.Do(req) + require.NoError(t, err) + + return testProfile, resp +} + +func waitForServerReady(t *testing.T, port int) { + t.Helper() + require.Eventually(t, func() bool { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", port)) + if err != nil { + return false + } + defer resp.Body.Close() + return resp.StatusCode == http.StatusNotFound + }, 2*time.Second, 100*time.Millisecond, "server did not start in time") +} + +func testAppendable(appendErr error) pyroscope.Appendable { + return &testAppender{appendErr: appendErr} +} + +type testAppender struct { + appendErr error + lastProfile *pyroscope.IncomingProfile +} + +func (a *testAppender) Appender() pyroscope.Appender { + return a +} + +func (a *testAppender) Append(_ context.Context, _ labels.Labels, _ []*pyroscope.RawSample) error { + return fmt.Errorf("Append method not implemented for test") +} + +func (a *testAppender) AppendIngest(_ context.Context, profile *pyroscope.IncomingProfile) error { + var buf bytes.Buffer + tee := io.TeeReader(profile.Body, &buf) + + newProfile := &pyroscope.IncomingProfile{ + Body: io.NopCloser(&buf), + Headers: profile.Headers, + URL: profile.URL, + } + a.lastProfile = newProfile + + _, err := io.Copy(io.Discard, tee) + if err != nil { + return err + } + + return a.appendErr +} + +func testOptions(t *testing.T) component.Options { + return component.Options{ + ID: "pyroscope.receive_http.test", + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + } +} diff --git a/internal/component/pyroscope/scrape/delta_profiles.go b/internal/component/pyroscope/scrape/delta_profiles.go index 926b7179cc..c2de9154f8 100644 --- a/internal/component/pyroscope/scrape/delta_profiles.go +++ b/internal/component/pyroscope/scrape/delta_profiles.go @@ -132,6 +132,11 @@ func (d *deltaAppender) Append(ctx context.Context, lbs labels.Labels, samples [ return nil } +func (d *deltaAppender) AppendIngest(_ context.Context, _ *pyroscope.IncomingProfile) error { + // No-op: AppendIngest is not used in deltaAppender + return nil +} + // computeDelta computes the delta between the given profile and the last // data is uncompressed if it is gzip compressed. // The returned data is always gzip compressed. diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 99c53cbfd2..549bfc2ae2 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -3,6 +3,10 @@ package write import ( "context" "errors" + "fmt" + "net/http" + "net/url" + "path" "strings" "time" @@ -149,34 +153,40 @@ func (c *Component) Update(newConfig component.Arguments) error { type fanOutClient struct { // The list of push clients to fan out to. - clients []pushv1connect.PusherServiceClient - - config Arguments - opts component.Options - metrics *metrics + clients []pushv1connect.PusherServiceClient + httpClient *http.Client + config Arguments + opts component.Options + metrics *metrics } // NewFanOut creates a new fan out client that will fan out to all endpoints. func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) { clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) uid := alloyseed.Get().UID + + var httpClient *http.Client for _, endpoint := range config.Endpoints { if endpoint.Headers == nil { endpoint.Headers = map[string]string{} } endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid - httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) + client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) if err != nil { return nil, err } - clients = append(clients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent))) + clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent))) + if httpClient == nil { + httpClient = client + } } return &fanOutClient{ - clients: clients, - config: config, - opts: opts, - metrics: metrics, + clients: clients, + httpClient: httpClient, + config: config, + opts: opts, + metrics: metrics, }, nil } @@ -271,7 +281,7 @@ func requestSize(req *connect.Request[pushv1.PushRequest]) (int64, int64) { return size, profiles } -// Append implements the pyroscope.Appendable interface. +// Appender implements the pyroscope.Appendable interface. func (f *fanOutClient) Appender() pyroscope.Appender { return f } @@ -318,6 +328,50 @@ func (f *fanOutClient) Append(ctx context.Context, lbs labels.Labels, samples [] return err } +type PyroscopeWriteError struct { + StatusCode int +} + +func (e *PyroscopeWriteError) Error() string { + return fmt.Sprintf("pyroscope write error: status %d", e.StatusCode) +} + +// AppendIngest implements the pyroscope.Appender interface. +func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { + for _, endpoint := range f.config.Endpoints { + u, err := url.Parse(endpoint.URL) + if err != nil { + return fmt.Errorf("parse endpoint URL: %w", err) + } + + u.Path = path.Join(u.Path, profile.URL.Path) + u.RawQuery = profile.URL.RawQuery + + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), profile.Body) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + for k, v := range endpoint.Headers { + req.Header.Set(k, v) + } + for k, v := range profile.Headers { + req.Header[k] = v + } + + resp, err := f.httpClient.Do(req) + if err != nil { + return fmt.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return &PyroscopeWriteError{StatusCode: resp.StatusCode} + } + } + return nil +} + // WithUserAgent returns a `connect.ClientOption` that sets the User-Agent header on. func WithUserAgent(agent string) connect.ClientOption { return connect.WithInterceptors(&agentInterceptor{agent}) From ffa4dbd8a129adb4d1654d5b86db8abe2826351f Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Tue, 15 Oct 2024 11:46:45 +0200 Subject: [PATCH 2/7] fix: improve AppendIngest with TeeReader and update AppendableFunc interface --- .../pyroscope/pyroscope.receive_http.md | 7 +- internal/component/pyroscope/appender.go | 21 +++-- internal/component/pyroscope/write/write.go | 16 +++- .../component/pyroscope/write/write_test.go | 77 +++++++++++++++++++ 4 files changed, 107 insertions(+), 14 deletions(-) diff --git a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md index a1f73a1df7..10fd658e06 100644 --- a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md +++ b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md @@ -82,9 +82,6 @@ pyroscope.receive_http "default" { pyroscope.write "staging" { endpoint { url = "http://pyroscope-staging:4040" - headers = { - "X-Scope-OrgID" = "squad-1", - } } } @@ -92,9 +89,6 @@ pyroscope.write "staging" { pyroscope.write "production" { endpoint { url = "http://pyroscope-production:4040" - headers = { - "X-Scope-OrgID" = "squad-1", - } } } ``` @@ -111,6 +105,7 @@ You can also create multiple `pyroscope.receive_http` components with different - Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters) + {{< admonition type="note" >}} Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details. diff --git a/internal/component/pyroscope/appender.go b/internal/component/pyroscope/appender.go index 7a2a23ebd2..a856e80b0b 100644 --- a/internal/component/pyroscope/appender.go +++ b/internal/component/pyroscope/appender.go @@ -122,12 +122,6 @@ func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []* return multiErr } -type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error - -func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { - return f(ctx, labels, samples) -} - // AppendIngest satisfies the AppenderIngest interface. func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) error { now := time.Now() @@ -143,3 +137,18 @@ func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) e } return multiErr } + +type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error + +func (f AppendableFunc) Appender() Appender { + return f +} + +func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error { + return f(ctx, labels, samples) +} + +func (f AppendableFunc) AppendIngest(_ context.Context, _ *IncomingProfile) error { + // This is a no-op implementation + return nil +} diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 549bfc2ae2..45d228c180 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -1,9 +1,11 @@ package write import ( + "bytes" "context" "errors" "fmt" + "io" "net/http" "net/url" "path" @@ -338,7 +340,10 @@ func (e *PyroscopeWriteError) Error() string { // AppendIngest implements the pyroscope.Appender interface. func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { - for _, endpoint := range f.config.Endpoints { + var buf bytes.Buffer + tee := io.TeeReader(profile.Body, &buf) + + for i, endpoint := range f.config.Endpoints { u, err := url.Parse(endpoint.URL) if err != nil { return fmt.Errorf("parse endpoint URL: %w", err) @@ -347,7 +352,14 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco u.Path = path.Join(u.Path, profile.URL.Path) u.RawQuery = profile.URL.RawQuery - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), profile.Body) + var bodyReader io.Reader + if i == 0 { + bodyReader = tee + } else { + bodyReader = bytes.NewReader(buf.Bytes()) + } + + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bodyReader) if err != nil { return fmt.Errorf("create request: %w", err) } diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go index 3439435e04..70e74651fa 100644 --- a/internal/component/pyroscope/write/write_test.go +++ b/internal/component/pyroscope/write/write_test.go @@ -1,10 +1,13 @@ package write import ( + "bytes" "context" "errors" + "io" "net/http" "net/http/httptest" + "net/url" "sync" "testing" "time" @@ -253,3 +256,77 @@ func TestBadAlloyConfig(t *testing.T) { err := syntax.Unmarshal([]byte(exampleAlloyConfig), &args) require.ErrorContains(t, err, "at most one of basic_auth, authorization, oauth2, bearer_token & bearer_token_file must be configured") } + +func Test_Write_AppendIngest(t *testing.T) { + var ( + export Exports + argument = DefaultArguments() + appendCount = atomic.NewInt32(0) + serverCount = int32(3) + servers = make([]*httptest.Server, serverCount) + endpoints = make([]*EndpointOptions, 0, serverCount) + ) + + testData := []byte("test-profile-data") + + handlerFn := func(expectedPath, expectedQuery string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + appendCount.Inc() + require.Equal(t, expectedPath, r.URL.Path, "Unexpected path") + require.Equal(t, expectedQuery, r.URL.RawQuery, "Unexpected query") + require.Equal(t, "test-value", r.Header.Get("X-Test-Header"), "Unexpected header value") + body, err := io.ReadAll(r.Body) + require.NoError(t, err, "Failed to read request body") + require.Equal(t, testData, body, "Unexpected body content") + w.WriteHeader(http.StatusOK) + } + } + + for i := int32(0); i < serverCount; i++ { + servers[i] = httptest.NewServer(handlerFn("/ingest", "key=value")) + endpoints = append(endpoints, &EndpointOptions{ + URL: servers[i].URL, + RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout, + Headers: map[string]string{ + "X-Test-Header": "test-value", + }, + }) + } + defer func() { + for _, s := range servers { + s.Close() + } + }() + + argument.Endpoints = endpoints + + // Create the receiver + var wg sync.WaitGroup + wg.Add(1) + c, err := New(component.Options{ + ID: "test-write", + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) { + defer wg.Done() + export = e.(Exports) + }, + }, argument) + require.NoError(t, err, "Failed to create component") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.Run(ctx) + wg.Wait() // wait for the state change to happen + require.NotNil(t, export.Receiver, "Receiver is nil") + + incomingProfile := &pyroscope.IncomingProfile{ + Body: io.NopCloser(bytes.NewReader(testData)), + Headers: http.Header{"Content-Type": []string{"application/octet-stream"}}, + URL: &url.URL{Path: "/ingest", RawQuery: "key=value"}, + } + + err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile) + require.NoError(t, err) + require.Equal(t, serverCount, appendCount.Load()) +} From 86c3b181d776895d20c3838c53fc8531be2dccf2 Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Tue, 22 Oct 2024 11:56:21 +0200 Subject: [PATCH 3/7] Apply docs suggestions from code review Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../pyroscope/pyroscope.receive_http.md | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md index 10fd658e06..06267706b6 100644 --- a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md +++ b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md @@ -1,7 +1,5 @@ --- canonical: https://grafana.com/docs/alloy/latest/reference/components/pyroscope/pyroscope.receive_http/ -aliases: - - ../pyroscope.receive_http/ # /docs/alloy/latest/reference/components/pyroscope.receive_http/ description: Learn about pyroscope.receive_http title: pyroscope.receive_http --- @@ -10,7 +8,8 @@ title: pyroscope.receive_http `pyroscope.receive_http` listens for HTTP requests containing profiles and forwards them to other components capable of receiving profiles. -The HTTP API exposed is compatible with Pyroscope's [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/). This allows `pyroscope.receive_http to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data. +The HTTP API exposed is compatible with the Pyroscope [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/). +This allows `pyroscope.receive_http` to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data. ## Usage @@ -23,9 +22,10 @@ pyroscope.receive_http "LABEL" { forward_to = RECEIVER_LIST } ``` -The component will start an HTTP server supporting the following endpoint: -`POST /ingest` - send profiles to the component, which in turn will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match that of Pyroscope's ingest API. +The component will start an HTTP server supporting the following endpoint. + +* `POST /ingest` - send profiles to the component, which will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match the format of the Pyroscope ingest API. ## Arguments @@ -54,9 +54,9 @@ Name | Type | Description `conn_limit` | `int` | Maximum number of simultaneous HTTP connections. Defaults to no limit. | `0` | no `listen_address` | `string` | Network address on which the server listens for new connections. Defaults to accepting all incoming connections. | `""` | no `listen_port` | `int` | Port number on which the server listens for new connections. | `8080` | no -`server_idle_timeout` | `duration` | Idle timeout for HTTP server. | `"120s"` | no -`server_read_timeout` | `duration` | Read timeout for HTTP server. | `"30s"` | no -`server_write_timeout` | `duration` | Write timeout for HTTP server. | `"30s"` | no +`server_idle_timeout` | `duration` | Idle timeout for the HTTP server. | `"120s"` | no +`server_read_timeout` | `duration` | Read timeout for the HTTP server. | `"30s"` | no +`server_write_timeout` | `duration` | Write timeout for the HTTP server. | `"30s"` | no ## Exported fields @@ -67,7 +67,9 @@ Name | Type | Description `pyroscope.receive_http` is reported as unhealthy if it is given an invalid configuration. ## Example -This example creates a `pyroscope.receive_http` component which starts an HTTP server listening on `0.0.0.0` and port `9999`. The server receives profiles and forwards them to multiple `pyroscope.write` components, which write these profiles to different HTTP endpoints. + +This example creates a `pyroscope.receive_http` component, which starts an HTTP server listening on `0.0.0.0` and port `9999`. +The server receives profiles and forwards them to multiple `pyroscope.write` components, which write these profiles to different HTTP endpoints. ```alloy // Receives profiles over HTTP pyroscope.receive_http "default" { @@ -93,7 +95,10 @@ pyroscope.write "production" { } ``` -Note: This example demonstrates forwarding to multiple `pyroscope.write` components. Be aware that this configuration will duplicate the received profiles, sending a copy to each configured `pyroscope.write` component. +{{< admonition type="note" >}} +This example demonstrates forwarding to multiple `pyroscope.write` components. +This configuration will duplicate the received profiles and send a copy to each configured `pyroscope.write` component. +{{< /admonition >}} You can also create multiple `pyroscope.receive_http` components with different configurations to listen on different addresses or ports as needed. This flexibility allows you to design a setup that best fits your infrastructure and profile routing requirements. From 64df29dca5d8f95b0f71fd38f258a45a5ab95beb Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Wed, 23 Oct 2024 10:14:20 +0200 Subject: [PATCH 4/7] use defaults and set maxConnLimit --- .../pyroscope/pyroscope.receive_http.md | 2 +- .../pyroscope/receive_http/receive_http.go | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md index 06267706b6..420065d406 100644 --- a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md +++ b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md @@ -6,7 +6,7 @@ title: pyroscope.receive_http # pyroscope.receive_http -`pyroscope.receive_http` listens for HTTP requests containing profiles and forwards them to other components capable of receiving profiles. +`pyroscope.receive_http` receives profiles over HTTP and forwards them to `pyroscope.*` components capable of receiving profiles. The HTTP API exposed is compatible with the Pyroscope [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/). This allows `pyroscope.receive_http` to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data. diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 5e28a5191a..5db58b137e 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -19,10 +19,15 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" ) +const ( + // defaultMaxConnLimit defines the maximum number of simultaneous HTTP connections + defaultMaxConnLimit = 100 +) + func init() { component.Register(component.Registration{ Name: "pyroscope.receive_http", - Stability: featuregate.StabilityGenerallyAvailable, + Stability: featuregate.StabilityPublicPreview, Args: Arguments{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) @@ -37,8 +42,12 @@ type Arguments struct { // SetToDefault implements syntax.Defaulter. func (a *Arguments) SetToDefault() { + serverConfig := fnet.DefaultServerConfig() + if serverConfig.HTTP.ConnLimit > defaultMaxConnLimit { + serverConfig.HTTP.ConnLimit = defaultMaxConnLimit + } *a = Arguments{ - Server: fnet.DefaultServerConfig(), + Server: serverConfig, } } @@ -82,9 +91,11 @@ func (c *Component) Update(args component.Arguments) error { c.appendables = newArgs.ForwardTo + // if no server config provided, we'll use defaults if newArgs.Server == nil { - newArgs.Server = fnet.DefaultServerConfig() + newArgs.Server = &fnet.ServerConfig{} } + if newArgs.Server.HTTP == nil { newArgs.Server.HTTP = &fnet.HTTPConfig{ ListenPort: 0, From f536f236c87d91c2547c032f8a8d7fcfe9a65e54 Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Mon, 28 Oct 2024 15:50:18 +0100 Subject: [PATCH 5/7] changed back to use adhoc defaults and set max conn limit to 100 --- .../components/pyroscope/pyroscope.receive_http.md | 2 +- .../component/pyroscope/receive_http/receive_http.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md index 420065d406..3b34f393a9 100644 --- a/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md +++ b/docs/sources/reference/components/pyroscope/pyroscope.receive_http.md @@ -51,7 +51,7 @@ You can use the following arguments to configure the `http` block. Any omitted f Name | Type | Description | Default | Required -----------------------|------------|------------------------------------------------------------------------------------------------------------------|----------|--------- -`conn_limit` | `int` | Maximum number of simultaneous HTTP connections. Defaults to no limit. | `0` | no +`conn_limit` | `int` | Maximum number of simultaneous HTTP connections. Defaults to 100. | `0` | no `listen_address` | `string` | Network address on which the server listens for new connections. Defaults to accepting all incoming connections. | `""` | no `listen_port` | `int` | Port number on which the server listens for new connections. | `8080` | no `server_idle_timeout` | `duration` | Idle timeout for the HTTP server. | `"120s"` | no diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index 5db58b137e..b1b1e82186 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -43,7 +43,7 @@ type Arguments struct { // SetToDefault implements syntax.Defaulter. func (a *Arguments) SetToDefault() { serverConfig := fnet.DefaultServerConfig() - if serverConfig.HTTP.ConnLimit > defaultMaxConnLimit { + if serverConfig.HTTP.ConnLimit == 0 { serverConfig.HTTP.ConnLimit = defaultMaxConnLimit } *a = Arguments{ @@ -93,7 +93,12 @@ func (c *Component) Update(args component.Arguments) error { // if no server config provided, we'll use defaults if newArgs.Server == nil { - newArgs.Server = &fnet.ServerConfig{} + newArgs.Server = fnet.DefaultServerConfig() + } + + // Only apply default max connections limit if using default config + if newArgs.Server.HTTP.ConnLimit == 0 { + newArgs.Server.HTTP.ConnLimit = defaultMaxConnLimit } if newArgs.Server.HTTP == nil { From 6970721fade2dd1b0e04bf949357af7bbe5fb2dc Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Tue, 29 Oct 2024 11:13:05 +0100 Subject: [PATCH 6/7] Avoid reading request body in memory for write component --- .../pyroscope/receive_http/receive_http.go | 7 +- internal/component/pyroscope/write/write.go | 98 +++++++++++-------- 2 files changed, 63 insertions(+), 42 deletions(-) diff --git a/internal/component/pyroscope/receive_http/receive_http.go b/internal/component/pyroscope/receive_http/receive_http.go index b1b1e82186..35a1240273 100644 --- a/internal/component/pyroscope/receive_http/receive_http.go +++ b/internal/component/pyroscope/receive_http/receive_http.go @@ -4,17 +4,18 @@ import ( "context" "errors" "fmt" - "github.com/gorilla/mux" - "github.com/grafana/alloy/internal/component/pyroscope/write" - "golang.org/x/sync/errgroup" "io" "net/http" "reflect" "sync" + "github.com/gorilla/mux" + "golang.org/x/sync/errgroup" + "github.com/grafana/alloy/internal/component" fnet "github.com/grafana/alloy/internal/component/common/net" "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/component/pyroscope/write" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" ) diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 45d228c180..5a2ade8306 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -1,7 +1,6 @@ package write import ( - "bytes" "context" "errors" "fmt" @@ -13,19 +12,20 @@ import ( "time" "connectrpc.com/connect" - "github.com/grafana/alloy/internal/alloyseed" - "github.com/grafana/alloy/internal/component/pyroscope" - "github.com/grafana/alloy/internal/featuregate" - "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/grafana/alloy/internal/useragent" "github.com/oklog/run" commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" + "github.com/grafana/alloy/internal/alloyseed" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/config" + "github.com/grafana/alloy/internal/component/pyroscope" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/useragent" "github.com/grafana/dskit/backoff" pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" "github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect" @@ -340,48 +340,68 @@ func (e *PyroscopeWriteError) Error() string { // AppendIngest implements the pyroscope.Appender interface. func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error { - var buf bytes.Buffer - tee := io.TeeReader(profile.Body, &buf) + pipeWriters := make([]io.Writer, len(f.config.Endpoints)) + pipeReaders := make([]io.Reader, len(f.config.Endpoints)) + for i := range f.config.Endpoints { + pr, pw := io.Pipe() + pipeReaders[i] = pr + pipeWriters[i] = pw + } + mw := io.MultiWriter(pipeWriters...) + + g, ctx := errgroup.WithContext(ctx) + // Start copying the profile body to all pipes + g.Go(func() error { + defer func() { + for _, pw := range pipeWriters { + pw.(io.WriteCloser).Close() + } + }() + _, err := io.Copy(mw, profile.Body) + return err + }) + + // Send to each endpoint concurrently for i, endpoint := range f.config.Endpoints { - u, err := url.Parse(endpoint.URL) - if err != nil { - return fmt.Errorf("parse endpoint URL: %w", err) - } + g.Go(func() error { + defer pipeReaders[i].(io.ReadCloser).Close() - u.Path = path.Join(u.Path, profile.URL.Path) - u.RawQuery = profile.URL.RawQuery + u, err := url.Parse(endpoint.URL) + if err != nil { + return fmt.Errorf("parse endpoint URL: %w", err) + } - var bodyReader io.Reader - if i == 0 { - bodyReader = tee - } else { - bodyReader = bytes.NewReader(buf.Bytes()) - } + u.Path = path.Join(u.Path, profile.URL.Path) + u.RawQuery = profile.URL.RawQuery - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bodyReader) - if err != nil { - return fmt.Errorf("create request: %w", err) - } + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), pipeReaders[i]) + if err != nil { + return fmt.Errorf("create request: %w", err) + } - for k, v := range endpoint.Headers { - req.Header.Set(k, v) - } - for k, v := range profile.Headers { - req.Header[k] = v - } + // Headers are still set the same way + for k, v := range endpoint.Headers { + req.Header.Set(k, v) + } + for k, v := range profile.Headers { + req.Header[k] = v + } - resp, err := f.httpClient.Do(req) - if err != nil { - return fmt.Errorf("do request: %w", err) - } - defer resp.Body.Close() + resp, err := f.httpClient.Do(req) + if err != nil { + return fmt.Errorf("do request: %w", err) + } + defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return &PyroscopeWriteError{StatusCode: resp.StatusCode} - } + if resp.StatusCode != http.StatusOK { + return &PyroscopeWriteError{StatusCode: resp.StatusCode} + } + return nil + }) } - return nil + + return g.Wait() } // WithUserAgent returns a `connect.ClientOption` that sets the User-Agent header on. From 590fb9cd084b0b0329aa74c6047342df76dcbdbd Mon Sep 17 00:00:00 2001 From: Marc Sanmiquel Date: Tue, 29 Oct 2024 13:13:17 +0100 Subject: [PATCH 7/7] override profile headers with endpoint ones --- internal/component/pyroscope/write/write.go | 10 ++++++---- internal/component/pyroscope/write/write_test.go | 14 +++++++++----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 5a2ade8306..d268edb28c 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -380,14 +380,16 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco return fmt.Errorf("create request: %w", err) } - // Headers are still set the same way - for k, v := range endpoint.Headers { - req.Header.Set(k, v) - } + // First set profile headers as defaults for k, v := range profile.Headers { req.Header[k] = v } + // Override any profile duplicated header + for k, v := range endpoint.Headers { + req.Header.Set(k, v) + } + resp, err := f.httpClient.Do(req) if err != nil { return fmt.Errorf("do request: %w", err) diff --git a/internal/component/pyroscope/write/write_test.go b/internal/component/pyroscope/write/write_test.go index 70e74651fa..74fa3d5612 100644 --- a/internal/component/pyroscope/write/write_test.go +++ b/internal/component/pyroscope/write/write_test.go @@ -274,7 +274,8 @@ func Test_Write_AppendIngest(t *testing.T) { appendCount.Inc() require.Equal(t, expectedPath, r.URL.Path, "Unexpected path") require.Equal(t, expectedQuery, r.URL.RawQuery, "Unexpected query") - require.Equal(t, "test-value", r.Header.Get("X-Test-Header"), "Unexpected header value") + require.Equal(t, "endpoint-value", r.Header.Get("X-Test-Header")) + require.Equal(t, []string{"profile-value1", "profile-value2"}, r.Header["X-Profile-Header"]) body, err := io.ReadAll(r.Body) require.NoError(t, err, "Failed to read request body") require.Equal(t, testData, body, "Unexpected body content") @@ -288,7 +289,7 @@ func Test_Write_AppendIngest(t *testing.T) { URL: servers[i].URL, RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout, Headers: map[string]string{ - "X-Test-Header": "test-value", + "X-Test-Header": "endpoint-value", }, }) } @@ -321,9 +322,12 @@ func Test_Write_AppendIngest(t *testing.T) { require.NotNil(t, export.Receiver, "Receiver is nil") incomingProfile := &pyroscope.IncomingProfile{ - Body: io.NopCloser(bytes.NewReader(testData)), - Headers: http.Header{"Content-Type": []string{"application/octet-stream"}}, - URL: &url.URL{Path: "/ingest", RawQuery: "key=value"}, + Body: io.NopCloser(bytes.NewReader(testData)), + Headers: http.Header{ + "X-Test-Header": []string{"profile-value"}, // This should be overridden by endpoint + "X-Profile-Header": []string{"profile-value1", "profile-value2"}, // This should be preserved + }, + URL: &url.URL{Path: "/ingest", RawQuery: "key=value"}, } err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)