Skip to content

Commit

Permalink
fix: improve AppendIngest with TeeReader and update AppendableFunc in…
Browse files Browse the repository at this point in the history
…terface
  • Loading branch information
marcsanmi committed Oct 15, 2024
1 parent 0b4d39d commit 8181a7f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ You can also create multiple `pyroscope.receive_http` components with different

- Components that export [Pyroscope `ProfilesReceiver`](../../../compatibility/#pyroscope-profilesreceiver-exporters)


{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
Expand Down
21 changes: 15 additions & 6 deletions internal/component/pyroscope/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ func (a *appender) Append(ctx context.Context, labels labels.Labels, samples []*
return multiErr
}

type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error

func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
return f(ctx, labels, samples)
}

// AppendIngest satisfies the AppenderIngest interface.
func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) error {
now := time.Now()
Expand All @@ -143,3 +137,18 @@ func (a *appender) AppendIngest(ctx context.Context, profile *IncomingProfile) e
}
return multiErr
}

type AppendableFunc func(ctx context.Context, labels labels.Labels, samples []*RawSample) error

func (f AppendableFunc) Appender() Appender {
return f
}

func (f AppendableFunc) Append(ctx context.Context, labels labels.Labels, samples []*RawSample) error {
return f(ctx, labels, samples)
}

func (f AppendableFunc) AppendIngest(ctx context.Context, profile *IncomingProfile) error {
// This is a no-op implementation or you can return an error if this operation is not supported
return nil
}
16 changes: 14 additions & 2 deletions internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package write

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
Expand Down Expand Up @@ -338,7 +340,10 @@ func (e *PyroscopeWriteError) Error() string {

// AppendIngest implements the pyroscope.Appender interface.
func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.IncomingProfile) error {
for _, endpoint := range f.config.Endpoints {
var buf bytes.Buffer
tee := io.TeeReader(profile.Body, &buf)

for i, endpoint := range f.config.Endpoints {
u, err := url.Parse(endpoint.URL)
if err != nil {
return fmt.Errorf("parse endpoint URL: %w", err)
Expand All @@ -347,7 +352,14 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
u.Path = path.Join(u.Path, profile.URL.Path)
u.RawQuery = profile.URL.RawQuery

req, err := http.NewRequestWithContext(ctx, "POST", u.String(), profile.Body)
var bodyReader io.Reader
if i == 0 {
bodyReader = tee
} else {
bodyReader = bytes.NewReader(buf.Bytes())
}

req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bodyReader)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
Expand Down
77 changes: 77 additions & 0 deletions internal/component/pyroscope/write/write_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package write

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -253,3 +256,77 @@ func TestBadAlloyConfig(t *testing.T) {
err := syntax.Unmarshal([]byte(exampleAlloyConfig), &args)
require.ErrorContains(t, err, "at most one of basic_auth, authorization, oauth2, bearer_token & bearer_token_file must be configured")
}

func Test_Write_AppendIngest(t *testing.T) {
var (
export Exports
argument = DefaultArguments()
appendCount = atomic.NewInt32(0)
serverCount = int32(3)
servers = make([]*httptest.Server, serverCount)
endpoints = make([]*EndpointOptions, 0, serverCount)
)

testData := []byte("test-profile-data")

handlerFn := func(expectedPath, expectedQuery string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
appendCount.Inc()
require.Equal(t, expectedPath, r.URL.Path, "Unexpected path")
require.Equal(t, expectedQuery, r.URL.RawQuery, "Unexpected query")
require.Equal(t, "test-value", r.Header.Get("X-Test-Header"), "Unexpected header value")
body, err := io.ReadAll(r.Body)
require.NoError(t, err, "Failed to read request body")
require.Equal(t, testData, body, "Unexpected body content")
w.WriteHeader(http.StatusOK)
}
}

for i := int32(0); i < serverCount; i++ {
servers[i] = httptest.NewServer(handlerFn("/ingest", "key=value"))
endpoints = append(endpoints, &EndpointOptions{
URL: servers[i].URL,
RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout,
Headers: map[string]string{
"X-Test-Header": "test-value",
},
})
}
defer func() {
for _, s := range servers {
s.Close()
}
}()

argument.Endpoints = endpoints

// Create the receiver
var wg sync.WaitGroup
wg.Add(1)
c, err := New(component.Options{
ID: "test-write",
Logger: util.TestAlloyLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {
defer wg.Done()
export = e.(Exports)
},
}, argument)
require.NoError(t, err, "Failed to create component")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)
wg.Wait() // wait for the state change to happen
require.NotNil(t, export.Receiver, "Receiver is nil")

incomingProfile := &pyroscope.IncomingProfile{
Body: io.NopCloser(bytes.NewReader(testData)),
Headers: http.Header{"Content-Type": []string{"application/octet-stream"}},
URL: &url.URL{Path: "/ingest", RawQuery: "key=value"},
}

err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)
require.NoError(t, err)
require.Equal(t, serverCount, appendCount.Load())
}

0 comments on commit 8181a7f

Please sign in to comment.