diff --git a/pkg/backends/otlp/backend.go b/pkg/backends/otlp/backend.go index 7c803b50..283434e0 100644 --- a/pkg/backends/otlp/backend.go +++ b/pkg/backends/otlp/backend.go @@ -29,11 +29,12 @@ const ( // to export values as OTLP metrics. // The zero value is not safe to use. type Backend struct { - batchesCreated uint64 // Accumulated number of batches created - batchesDropped uint64 // Accumulated number of batches aborted (data loss) - batchesSent uint64 // Accumulated number of batches successfully sent - seriesSent uint64 // Accumulated number of series successfully sent - seriesDropped uint64 // Accumulated number of series aborted (data loss) + batchesCreated uint64 // Accumulated number of batches created + batchesDropped uint64 // Accumulated number of batches aborted (data loss) + batchesSent uint64 // Accumulated number of batches successfully sent + batchesRetried stats.ChangeGauge // Accumulated number of batches retried (first send is not a retry) + seriesSent uint64 // Accumulated number of series successfully sent + seriesDropped uint64 // Accumulated number of series aborted (data loss) eventsSent uint64 // Accumulated number of events successfully sent eventsDropped uint64 // Accumulated number of events aborted (data loss) @@ -48,6 +49,7 @@ type Backend struct { logger logrus.FieldLogger client *http.Client requestsBufferSem chan struct{} + maxRetries int // metricsPerBatch is the maximum number of metrics to send in a single batch. metricsPerBatch int @@ -81,6 +83,7 @@ func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transpo client: tc.Client, logger: logger, requestsBufferSem: make(chan struct{}, cfg.MaxRequests), + maxRetries: cfg.MaxRetries, metricsPerBatch: cfg.MetricsPerBatch, }, nil } @@ -133,6 +136,7 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, statser.Gauge("backend.sent", float64(atomic.LoadUint64(&bd.batchesSent)), nil) statser.Gauge("backend.series.sent", float64(atomic.LoadUint64(&bd.seriesSent)), nil) statser.Gauge("backend.series.dropped", float64(atomic.LoadUint64(&bd.seriesDropped)), nil) + bd.batchesRetried.SendIfChanged(statser, "backend.retried", nil) }() group := newGroups(bd.metricsPerBatch) @@ -311,10 +315,26 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error { c.requestsBufferSem <- struct{}{} resp, err := c.client.Do(req) <-c.requestsBufferSem - if err != nil { - atomic.AddUint64(&c.batchesDropped, 1) - return err + // OTLP standard specifies 400 will be returned if the request is non-retryable, so we don't retry on 400 + if err != nil && resp.StatusCode != http.StatusBadRequest { + for i := 0; i < c.maxRetries; i++ { + atomic.AddUint64(&c.batchesRetried.Cur, 1) + if resp != nil { + resp.Body.Close() + } + c.requestsBufferSem <- struct{}{} + resp, err = c.client.Do(req) + <-c.requestsBufferSem + if err == nil { + break + } else { + c.logger.WithError(err).WithFields(logrus.Fields{ + "endpoint": c.metricsEndpoint, + }).Error("failed while retrying") + } + } } + defer resp.Body.Close() dropped, err := data.ProcessMetricResponse(resp) atomic.AddUint64(&c.seriesDropped, uint64(dropped)) diff --git a/pkg/backends/otlp/config.go b/pkg/backends/otlp/config.go index ca720b1c..f0194762 100644 --- a/pkg/backends/otlp/config.go +++ b/pkg/backends/otlp/config.go @@ -27,6 +27,8 @@ type Config struct { LogsEndpoint string `mapstructure:"logs_endpoint"` // MaxRequests (Optional, default: cpu.count * 2) is the upper limit on the number of inflight requests MaxRequests int `mapstructure:"max_requests"` + // MaxRetries (Optional, default: 3) is the maximum number of retries to send a batch + MaxRetries int `mapstructure:"max_retries"` // MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch. MetricsPerBatch int `mapstructure:"metrics_per_batch"` // ResourceKeys (Optional) is used to extract values from provided tags @@ -59,6 +61,7 @@ func newDefaultConfig() *Config { return &Config{ Transport: "default", MaxRequests: runtime.NumCPU() * 2, + MaxRetries: 3, MetricsPerBatch: defaultMetricsPerBatch, Conversion: ConversionAsGauge, UserAgent: "gostatsd", @@ -88,6 +91,9 @@ func (c *Config) Validate() (errs error) { if c.MaxRequests <= 0 { errs = multierr.Append(errs, errors.New("max request must be a positive value")) } + if c.MaxRetries <= 0 { + errs = multierr.Append(errs, errors.New("max retries must be a positive value")) + } if c.MetricsPerBatch <= 0 { errs = multierr.Append(errs, errors.New("metrics per batch must be a positive value")) } diff --git a/pkg/backends/otlp/config_test.go b/pkg/backends/otlp/config_test.go index 9fc93f3c..3287e289 100644 --- a/pkg/backends/otlp/config_test.go +++ b/pkg/backends/otlp/config_test.go @@ -29,6 +29,7 @@ func TestNewConfig(t *testing.T) { v.SetDefault("otlp.metrics_endpoint", "http://local/v1/metrics") v.SetDefault("otlp.logs_endpoint", "http://local/v1/logs") v.SetDefault("otlp.max_requests", 1) + v.SetDefault("otlp.max_retries", 3) v.SetDefault("otlp.metrics_per_batch", 999) return v }(), @@ -36,6 +37,7 @@ func TestNewConfig(t *testing.T) { MetricsEndpoint: "http://local/v1/metrics", LogsEndpoint: "http://local/v1/logs", MaxRequests: 1, + MaxRetries: 3, MetricsPerBatch: 999, Conversion: "AsGauge", Transport: "default",