Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pyroscope.receive_http component for profile handling #1886

Merged
merged 7 commits into from
Oct 29, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Main (unreleased)
### Features

- Add the function `path_join` to the stdlib. (@wildum)
- Add `pyroscope.receive_http` component to receive and forward Pyroscope profiles (@marcsanmi)

- Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97)

Expand Down
1 change: 1 addition & 0 deletions docs/sources/reference/compatibility/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ The following components, grouped by namespace, _consume_ Pyroscope `ProfilesRec
{{< collapse title="pyroscope" >}}
- [pyroscope.ebpf](../components/pyroscope/pyroscope.ebpf)
- [pyroscope.java](../components/pyroscope/pyroscope.java)
- [pyroscope.receive_http](../components/pyroscope/pyroscope.receive_http)
- [pyroscope.scrape](../components/pyroscope/pyroscope.scrape)
{{< /collapse >}}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/pyroscope/pyroscope.receive_http/
aliases:
- ../pyroscope.receive_http/ # /docs/alloy/latest/reference/components/pyroscope.receive_http/
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved
description: Learn about pyroscope.receive_http
title: pyroscope.receive_http
---

# pyroscope.receive_http

`pyroscope.receive_http` listens for HTTP requests containing profiles and forwards them to other components capable of receiving profiles.

The HTTP API exposed is compatible with Pyroscope's [HTTP ingest API](https://grafana.com/docs/pyroscope/latest/configure-server/about-server-api/). This allows `pyroscope.receive_http to act as a proxy for Pyroscope profiles, enabling flexible routing and distribution of profile data.
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved

## Usage

```alloy
pyroscope.receive_http "LABEL" {
http {
listen_address = "LISTEN_ADDRESS"
listen_port = PORT
}
forward_to = RECEIVER_LIST
}
```
The component will start an HTTP server supporting the following endpoint:

`POST /ingest` - send profiles to the component, which in turn will be forwarded to the receivers as configured in the `forward_to argument`. The request format must match that of Pyroscope's ingest API.
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved

## Arguments

The following arguments are supported:

Name | Type | Description | Default | Required
------------------|---------------|-------------------------------------------------|---------|---------
`forward_to` | `list(ProfilesReceiver)` | List of receivers to send profiles to. | | yes

## Blocks

The following blocks are supported inside the definition of `pyroscope.receive_http`:

Hierarchy | Name | Description | Required
----------|------|----------------------------------------------------|---------
`http` | `http` | Configures the HTTP server that receives requests. | no

### http

The `http` block configures the HTTP server.

You can use the following arguments to configure the `http` block. Any omitted fields take their default values.

Name | Type | Description | Default | Required
-----------------------|------------|------------------------------------------------------------------------------------------------------------------|----------|---------
`conn_limit` | `int` | Maximum number of simultaneous HTTP connections. Defaults to no limit. | `0` | no
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved
`listen_address` | `string` | Network address on which the server listens for new connections. Defaults to accepting all incoming connections. | `""` | no
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
`listen_port` | `int` | Port number on which the server listens for new connections. | `8080` | no
`server_idle_timeout` | `duration` | Idle timeout for HTTP server. | `"120s"` | no
`server_read_timeout` | `duration` | Read timeout for HTTP server. | `"30s"` | no
`server_write_timeout` | `duration` | Write timeout for HTTP server. | `"30s"` | no
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved

## Exported fields

`pyroscope.receive_http` does not export any fields.

## Component health

`pyroscope.receive_http` is reported as unhealthy if it is given an invalid configuration.

## Example
This example creates a `pyroscope.receive_http` component which starts an HTTP server listening on `0.0.0.0` and port `9999`. The server receives profiles and forwards them to multiple `pyroscope.write` components, which write these profiles to different HTTP endpoints.
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved
```alloy
// Receives profiles over HTTP
pyroscope.receive_http "default" {
http {
listen_address = "0.0.0.0"
listen_port = 9999
}
forward_to = [pyroscope.write.staging.receiver, pyroscope.write.production.receiver]
}

// Send profiles to a staging Pyroscope instance
pyroscope.write "staging" {
endpoint {
url = "http://pyroscope-staging:4040"
}
}

// Send profiles to a production Pyroscope instance
pyroscope.write "production" {
endpoint {
url = "http://pyroscope-production:4040"
}
}
```

Note: This example demonstrates forwarding to multiple `pyroscope.write` components. Be aware that this configuration will duplicate the received profiles, sending a copy to each configured `pyroscope.write` component.
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved

You can also create multiple `pyroscope.receive_http` components with different configurations to listen on different addresses or ports as needed. This flexibility allows you to design a setup that best fits your infrastructure and profile routing requirements.

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components

`pyroscope.receive_http` can accept arguments from the following components:

- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters)


{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}

<!-- END GENERATED COMPATIBLE COMPONENTS -->
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ import (
_ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape
_ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf
_ "github.com/grafana/alloy/internal/component/pyroscope/java" // Import pyroscope.java
_ "github.com/grafana/alloy/internal/component/pyroscope/receive_http" // Import pyroscope.receive_http
_ "github.com/grafana/alloy/internal/component/pyroscope/scrape" // Import pyroscope.scrape
_ "github.com/grafana/alloy/internal/component/pyroscope/write" // Import pyroscope.write
_ "github.com/grafana/alloy/internal/component/remote/http" // Import remote.http
Expand Down
35 changes: 33 additions & 2 deletions internal/component/pyroscope/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package pyroscope

import (
"context"
"io"
"net/http"
"net/url"
"sync"
"time"

Expand All @@ -22,13 +25,20 @@ type Appendable interface {

type Appender interface {
Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error
AppendIngest(ctx context.Context, profile *IncomingProfile) error
}

type RawSample struct {
// raw_profile is the set of bytes of the pprof profile
RawProfile []byte
}

type IncomingProfile struct {
Body io.ReadCloser
Headers http.Header
URL *url.URL
}

var _ Appendable = (*Fanout)(nil)

// Fanout supports the default Alloy style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
Expand Down Expand Up @@ -112,12 +122,33 @@ func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*
return multiErr
}

// AppendIngest satisfies the AppenderIngest interface.
func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) error {
now := time.Now()
defer func() {
a.writeLatency.Observe(time.Since(now).Seconds())
}()
var multiErr error
for _, x := range a.children {
err := x.AppendIngest(ctx, profile)
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
}
return multiErr
}

type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error

func (f AppendableFunc) Appender() Appender {
return f
}

func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
return f(ctx, labels, samples)
}

func (f AppendableFunc) Appender() Appender {
return f
func (f AppendableFunc) AppendIngest(_ context.Context, _ *IncomingProfile) error {
// This is a no-op implementation
return nil
}
182 changes: 182 additions & 0 deletions internal/component/pyroscope/receive_http/receive_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package receive_http

import (
"context"
"errors"
"fmt"
"github.com/gorilla/mux"
"github.com/grafana/alloy/internal/component/pyroscope/write"
"golang.org/x/sync/errgroup"
"io"
"net/http"
"reflect"
"sync"

"github.com/grafana/alloy/internal/component"
fnet "github.com/grafana/alloy/internal/component/common/net"
"github.com/grafana/alloy/internal/component/pyroscope"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
)

func init() {
component.Register(component.Registration{
Name: "pyroscope.receive_http",
Stability: featuregate.StabilityGenerallyAvailable,
marcsanmi marked this conversation as resolved.
Show resolved Hide resolved
Args: Arguments{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

type Arguments struct {
Server *fnet.ServerConfig `alloy:",squash"`
ForwardTo []pyroscope.Appendable `alloy:"forward_to,attr"`
}

// SetToDefault implements syntax.Defaulter.
func (a *Arguments) SetToDefault() {
*a = Arguments{
Server: fnet.DefaultServerConfig(),
}
}

type Component struct {
opts component.Options
server *fnet.TargetServer
appendables []pyroscope.Appendable
mut sync.Mutex
}

func New(opts component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: opts,
appendables: args.ForwardTo,
}

if err := c.Update(args); err != nil {
return nil, err
}

return c, nil
}

func (c *Component) Run(ctx context.Context) error {
defer func() {
c.mut.Lock()
defer c.mut.Unlock()
c.shutdownServer()
}()

<-ctx.Done()
level.Info(c.opts.Logger).Log("msg", "terminating due to context done")
return nil
}

func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)

c.mut.Lock()
defer c.mut.Unlock()

c.appendables = newArgs.ForwardTo

if newArgs.Server == nil {
newArgs.Server = fnet.DefaultServerConfig()
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
}
if newArgs.Server.HTTP == nil {
newArgs.Server.HTTP = &fnet.HTTPConfig{
ListenPort: 0,
ListenAddress: "127.0.0.1",
}
}

serverNeedsRestarting := c.server == nil || !reflect.DeepEqual(c.server, *newArgs.Server.HTTP)
if !serverNeedsRestarting {
return nil
}

c.shutdownServer()

srv, err := fnet.NewTargetServer(c.opts.Logger, "pyroscope_receive_http", c.opts.Registerer, newArgs.Server)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
c.server = srv

return c.server.MountAndRun(func(router *mux.Router) {
router.HandleFunc("/ingest", c.handleIngest).Methods(http.MethodPost)
})
}

func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
c.mut.Lock()
appendables := c.appendables
c.mut.Unlock()

// Create a pipe for each appendable
pipeWriters := make([]io.Writer, len(appendables))
pipeReaders := make([]io.Reader, len(appendables))
for i := range appendables {
pr, pw := io.Pipe()
pipeReaders[i] = pr
pipeWriters[i] = pw
}
mw := io.MultiWriter(pipeWriters...)

// Create an errgroup with the timeout context
g, ctx := errgroup.WithContext(r.Context())

// Start copying the request body to all pipes
g.Go(func() error {
defer func() {
for _, pw := range pipeWriters {
pw.(io.WriteCloser).Close()
}
}()
_, err := io.Copy(mw, r.Body)
return err
})

// Process each appendable
for i, appendable := range appendables {
g.Go(func() error {
defer pipeReaders[i].(io.ReadCloser).Close()

profile := &pyroscope.IncomingProfile{
Body: io.NopCloser(pipeReaders[i]),
Headers: r.Header.Clone(),
URL: r.URL,
}

err := appendable.Appender().AppendIngest(ctx, profile)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "Failed to append profile", "appendable", i, "err", err)
return err
}
level.Debug(c.opts.Logger).Log("msg", "Profile appended successfully", "appendable", i)
return nil
})
}

err := g.Wait()
if err != nil {
var writeErr *write.PyroscopeWriteError
if errors.As(err, &writeErr) {
http.Error(w, http.StatusText(writeErr.StatusCode), writeErr.StatusCode)
} else {
level.Error(c.opts.Logger).Log("msg", "Failed to process request", "err", err)
http.Error(w, "Failed to process request", http.StatusInternalServerError)
}
return
}
w.WriteHeader(http.StatusOK)
}

func (c *Component) shutdownServer() {
if c.server != nil {
c.server.StopAndShutdown()
c.server = nil
}
}
Loading
Loading