Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retries for OTLP backend #717

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ const (
// to export values as OTLP metrics.
// The zero value is not safe to use.
type Backend struct {
batchesCreated uint64 // Accumulated number of batches created
batchesDropped uint64 // Accumulated number of batches aborted (data loss)
batchesSent uint64 // Accumulated number of batches successfully sent
seriesSent uint64 // Accumulated number of series successfully sent
seriesDropped uint64 // Accumulated number of series aborted (data loss)
batchesCreated uint64 // Accumulated number of batches created
batchesDropped uint64 // Accumulated number of batches aborted (data loss)
batchesSent uint64 // Accumulated number of batches successfully sent
batchesRetried stats.ChangeGauge // Accumulated number of batches retried (first send is not a retry)
seriesSent uint64 // Accumulated number of series successfully sent
seriesDropped uint64 // Accumulated number of series aborted (data loss)

eventsSent uint64 // Accumulated number of events successfully sent
eventsDropped uint64 // Accumulated number of events aborted (data loss)
Expand All @@ -48,6 +49,7 @@ type Backend struct {
logger logrus.FieldLogger
client *http.Client
requestsBufferSem chan struct{}
maxRetries int

// metricsPerBatch is the maximum number of metrics to send in a single batch.
metricsPerBatch int
Expand Down Expand Up @@ -81,6 +83,7 @@ func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transpo
client: tc.Client,
logger: logger,
requestsBufferSem: make(chan struct{}, cfg.MaxRequests),
maxRetries: cfg.MaxRetries,
metricsPerBatch: cfg.MetricsPerBatch,
}, nil
}
Expand Down Expand Up @@ -133,6 +136,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap,
statser.Gauge("backend.sent", float64(atomic.LoadUint64(&bd.batchesSent)), nil)
statser.Gauge("backend.series.sent", float64(atomic.LoadUint64(&bd.seriesSent)), nil)
statser.Gauge("backend.series.dropped", float64(atomic.LoadUint64(&bd.seriesDropped)), nil)
bd.batchesRetried.SendIfChanged(statser, "backend.retried", nil)
}()

group := newGroups(bd.metricsPerBatch)
Expand Down Expand Up @@ -311,10 +315,26 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error {
c.requestsBufferSem <- struct{}{}
resp, err := c.client.Do(req)
<-c.requestsBufferSem
if err != nil {
atomic.AddUint64(&c.batchesDropped, 1)
return err
// OTLP standard specifies 400 will be returned if the request is non-retryable, so we don't retry on 400
if err != nil && resp.StatusCode != http.StatusBadRequest {
for i := 0; i < c.maxRetries; i++ {
atomic.AddUint64(&c.batchesRetried.Cur, 1)
if resp != nil {
resp.Body.Close()
}
c.requestsBufferSem <- struct{}{}
resp, err = c.client.Do(req)
<-c.requestsBufferSem
if err == nil {
break
} else {
c.logger.WithError(err).WithFields(logrus.Fields{
"endpoint": c.metricsEndpoint,
}).Error("failed while retrying")
}
}
}
defer resp.Body.Close()
dropped, err := data.ProcessMetricResponse(resp)
atomic.AddUint64(&c.seriesDropped, uint64(dropped))

Expand Down
6 changes: 6 additions & 0 deletions pkg/backends/otlp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Config struct {
LogsEndpoint string `mapstructure:"logs_endpoint"`
// MaxRequests (Optional, default: cpu.count * 2) is the upper limit on the number of inflight requests
MaxRequests int `mapstructure:"max_requests"`
// MaxRetries (Optional, default: 3) is the maximum number of retries to send a batch
MaxRetries int `mapstructure:"max_retries"`
// MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch.
MetricsPerBatch int `mapstructure:"metrics_per_batch"`
// ResourceKeys (Optional) is used to extract values from provided tags
Expand Down Expand Up @@ -59,6 +61,7 @@ func newDefaultConfig() *Config {
return &Config{
Transport: "default",
MaxRequests: runtime.NumCPU() * 2,
MaxRetries: 3,
MetricsPerBatch: defaultMetricsPerBatch,
Conversion: ConversionAsGauge,
UserAgent: "gostatsd",
Expand Down Expand Up @@ -88,6 +91,9 @@ func (c *Config) Validate() (errs error) {
if c.MaxRequests <= 0 {
errs = multierr.Append(errs, errors.New("max request must be a positive value"))
}
if c.MaxRetries <= 0 {
errs = multierr.Append(errs, errors.New("max retries must be a positive value"))
}
if c.MetricsPerBatch <= 0 {
errs = multierr.Append(errs, errors.New("metrics per batch must be a positive value"))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/backends/otlp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func TestNewConfig(t *testing.T) {
v.SetDefault("otlp.metrics_endpoint", "http://local/v1/metrics")
v.SetDefault("otlp.logs_endpoint", "http://local/v1/logs")
v.SetDefault("otlp.max_requests", 1)
v.SetDefault("otlp.max_retries", 3)
v.SetDefault("otlp.metrics_per_batch", 999)
return v
}(),
expect: &Config{
MetricsEndpoint: "http://local/v1/metrics",
LogsEndpoint: "http://local/v1/logs",
MaxRequests: 1,
MaxRetries: 3,
MetricsPerBatch: 999,
Conversion: "AsGauge",
Transport: "default",
Expand Down
Loading