Skip to content

Commit

Permalink
Merge pull request #717 from atlassian/htan/otlp-retry
Browse files Browse the repository at this point in the history
add retries for OTLP backend
  • Loading branch information
hstan authored Aug 5, 2024
2 parents ee976cf + ff4f573 commit febe11c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
36 changes: 28 additions & 8 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ const (
// to export values as OTLP metrics.
// The zero value is not safe to use.
type Backend struct {
batchesCreated uint64 // Accumulated number of batches created
batchesDropped uint64 // Accumulated number of batches aborted (data loss)
batchesSent uint64 // Accumulated number of batches successfully sent
seriesSent uint64 // Accumulated number of series successfully sent
seriesDropped uint64 // Accumulated number of series aborted (data loss)
batchesCreated uint64 // Accumulated number of batches created
batchesDropped uint64 // Accumulated number of batches aborted (data loss)
batchesSent uint64 // Accumulated number of batches successfully sent
batchesRetried stats.ChangeGauge // Accumulated number of batches retried (first send is not a retry)
seriesSent uint64 // Accumulated number of series successfully sent
seriesDropped uint64 // Accumulated number of series aborted (data loss)

eventsSent uint64 // Accumulated number of events successfully sent
eventsDropped uint64 // Accumulated number of events aborted (data loss)
Expand All @@ -48,6 +49,7 @@ type Backend struct {
logger logrus.FieldLogger
client *http.Client
requestsBufferSem chan struct{}
maxRetries int

// metricsPerBatch is the maximum number of metrics to send in a single batch.
metricsPerBatch int
Expand Down Expand Up @@ -81,6 +83,7 @@ func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transpo
client: tc.Client,
logger: logger,
requestsBufferSem: make(chan struct{}, cfg.MaxRequests),
maxRetries: cfg.MaxRetries,
metricsPerBatch: cfg.MetricsPerBatch,
}, nil
}
Expand Down Expand Up @@ -133,6 +136,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
statser.Gauge("backend.sent", float64(atomic.LoadUint64(&bd.batchesSent)), nil)
statser.Gauge("backend.series.sent", float64(atomic.LoadUint64(&bd.seriesSent)), nil)
statser.Gauge("backend.series.dropped", float64(atomic.LoadUint64(&bd.seriesDropped)), nil)
bd.batchesRetried.SendIfChanged(statser, "backend.retried", nil)
}()

group := newGroups(bd.metricsPerBatch)
Expand Down Expand Up @@ -311,10 +315,26 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error {
c.requestsBufferSem <- struct{}{}
resp, err := c.client.Do(req)
<-c.requestsBufferSem
if err != nil {
atomic.AddUint64(&c.batchesDropped, 1)
return err
// OTLP standard specifies 400 will be returned if the request is non-retryable, so we don't retry on 400
if err != nil && resp.StatusCode != http.StatusBadRequest {
for i := 0; i < c.maxRetries; i++ {
atomic.AddUint64(&c.batchesRetried.Cur, 1)
if resp != nil {
resp.Body.Close()
}
c.requestsBufferSem <- struct{}{}
resp, err = c.client.Do(req)
<-c.requestsBufferSem
if err == nil {
break
} else {
c.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": c.metricsEndpoint,
}).Error("failed while retrying")
}
}
}
defer resp.Body.Close()
dropped, err := data.ProcessMetricResponse(resp)
atomic.AddUint64(&c.seriesDropped, uint64(dropped))

Expand Down
6 changes: 6 additions & 0 deletions pkg/backends/otlp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Config struct {
LogsEndpoint string `mapstructure:"logs_endpoint"`
// MaxRequests (Optional, default: cpu.count * 2) is the upper limit on the number of inflight requests
MaxRequests int `mapstructure:"max_requests"`
// MaxRetries (Optional, default: 3) is the maximum number of retries to send a batch
MaxRetries int `mapstructure:"max_retries"`
// MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch.
MetricsPerBatch int `mapstructure:"metrics_per_batch"`
// ResourceKeys (Optional) is used to extract values from provided tags
Expand Down Expand Up @@ -59,6 +61,7 @@ func newDefaultConfig() *Config {
return &Config{
Transport: "default",
MaxRequests: runtime.NumCPU() * 2,
MaxRetries: 3,
MetricsPerBatch: defaultMetricsPerBatch,
Conversion: ConversionAsGauge,
UserAgent: "gostatsd",
Expand Down Expand Up @@ -88,6 +91,9 @@ func (c *Config) Validate() (errs error) {
if c.MaxRequests <= 0 {
errs = multierr.Append(errs, errors.New("max request must be a positive value"))
}
if c.MaxRetries <= 0 {
errs = multierr.Append(errs, errors.New("max retries must be a positive value"))
}
if c.MetricsPerBatch <= 0 {
errs = multierr.Append(errs, errors.New("metrics per batch must be a positive value"))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/backends/otlp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func TestNewConfig(t *testing.T) {
v.SetDefault("otlp.metrics_endpoint", "http://local/v1/metrics")
v.SetDefault("otlp.logs_endpoint", "http://local/v1/logs")
v.SetDefault("otlp.max_requests", 1)
v.SetDefault("otlp.max_retries", 3)
v.SetDefault("otlp.metrics_per_batch", 999)
return v
}(),
expect: &Config{
MetricsEndpoint: "http://local/v1/metrics",
LogsEndpoint: "http://local/v1/logs",
MaxRequests: 1,
MaxRetries: 3,
MetricsPerBatch: 999,
Conversion: "AsGauge",
Transport: "default",
Expand Down

0 comments on commit febe11c

Please sign in to comment.