Skip to content

Commit

Permalink
Merge pull request #719 from atlassian/htan/retry-test
Browse files Browse the repository at this point in the history
add tests and refactor retry sending
  • Loading branch information
hstan authored Aug 5, 2024
2 parents 640077e + 7360256 commit da2dba9
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 24 deletions.
51 changes: 28 additions & 23 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,39 +304,44 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
}

func (c *Backend) postMetrics(ctx context.Context, batch group) error {
resourceMetrics := batch.values()
var (
retries int
req *http.Request
resp *http.Response
err error
)

req, err := data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics)
resourceMetrics := batch.values()
req, err = data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics)
if err != nil {
atomic.AddUint64(&c.batchesDropped, 1)
return err
}

c.requestsBufferSem <- struct{}{}
resp, err := c.client.Do(req)
<-c.requestsBufferSem
// OTLP standard specifies 400 will be returned if the request is non-retryable, so we don't retry on 400
if err != nil && resp.StatusCode != http.StatusBadRequest {
for i := 0; i < c.maxRetries; i++ {
atomic.AddUint64(&c.batchesRetried.Cur, 1)
if resp != nil {
resp.Body.Close()
}
c.requestsBufferSem <- struct{}{}
resp, err = c.client.Do(req)
<-c.requestsBufferSem
for {
var dropped int64
c.requestsBufferSem <- struct{}{}
resp, err = c.client.Do(req)
<-c.requestsBufferSem
if err == nil {
dropped, err = data.ProcessMetricResponse(resp)
if err == nil {
break
} else {
c.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": c.metricsEndpoint,
}).Error("failed while retrying")
return nil
}
if dropped > 0 {
// If partial data points were dropped, it shouldn't retry
atomic.AddUint64(&c.seriesDropped, uint64(dropped))
return err
}
}

if retries >= c.maxRetries {
break
}

retries++
atomic.AddUint64(&c.batchesRetried.Cur, 1)
}
defer resp.Body.Close()
dropped, err := data.ProcessMetricResponse(resp)
atomic.AddUint64(&c.seriesDropped, uint64(dropped))

return err
}
72 changes: 72 additions & 0 deletions pkg/backends/otlp/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,78 @@ func TestBackendSendAsyncMetrics(t *testing.T) {
}
}

func TestRetrySendMetrics(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
numUntilSuccess int
maxRetries int
wantAttempts int
numErrs int
}{
{
name: "should retry sending metrics if it fails for the first time",
numUntilSuccess: 2,
maxRetries: 3,
wantAttempts: 2,
numErrs: 0,
},
{
name: "should give up if it still fails when reach the maximum number of retries",
numUntilSuccess: 5,
maxRetries: 3,
wantAttempts: 4,
numErrs: 1,
},
{
name: "should not retry if it succeeds at the first time",
numUntilSuccess: 1,
maxRetries: 3,
wantAttempts: 1,
numErrs: 0,
},
{
name: "should not retry if maxRetries is 0",
numUntilSuccess: 5,
maxRetries: 0,
wantAttempts: 1,
numErrs: 1,
},
} {
t.Run(tc.name, func(t *testing.T) {
attempts := 0
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
if attempts < tc.numUntilSuccess {
http.Error(w, "im dead", http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(s.Close)

v := viper.New()
v.Set("otlp.metrics_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/metrics"))
v.Set("otlp.logs_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/logs"))
v.Set("otlp.max_retries", tc.maxRetries)

logger := fixtures.NewTestLogger(t)

b, err := NewClientFromViper(
v,
logger,
transport.NewTransportPool(logger, v),
)
require.NoError(t, err, "Must not error creating backend")

b.SendMetricsAsync(context.Background(), gostatsd.NewMetricMap(false), func(errs []error) {
assert.Equal(t, tc.numErrs, len(errs))
assert.Equal(t, tc.wantAttempts, attempts, "Must retry sending metrics")
})
})
}
}

func TestSendEvent(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/otlp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Config) Validate() (errs error) {
if c.MaxRequests <= 0 {
errs = multierr.Append(errs, errors.New("max request must be a positive value"))
}
if c.MaxRetries <= 0 {
if c.MaxRetries < 0 {
errs = multierr.Append(errs, errors.New("max retries must be a positive value"))
}
if c.MetricsPerBatch <= 0 {
Expand Down
5 changes: 5 additions & 0 deletions pkg/backends/otlp/internal/data/metric_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package data

import (
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -11,6 +12,10 @@ import (
)

func ProcessMetricResponse(resp *http.Response) (dropped int64, errs error) {
if resp == nil {
return 0, errors.New("empty response")
}

buf, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
Expand Down

0 comments on commit da2dba9

Please sign in to comment.