Skip to content

Commit

Permalink
output: Retry document-level 429s by default (#13620)
Browse files Browse the repository at this point in the history
Updates the APM Server to automatically retry document-level `429`s from
Elasticsearch to avoid dropping data. It can be configured/overwritten
by `output.elasticsearch.max_retries`, and defaults to `3`.

It uses the default backoff configuration, which could wait up to 1m if
enough retries are configured, but can be overwritten as well.

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
Co-authored-by: Carson Ip <[email protected]>
(cherry picked from commit 28436dc)

# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
marclop authored and mergify[bot] committed Jul 8, 2024
1 parent 4ab802c commit e419b95
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
4 changes: 4 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ https://github.com/elastic/apm-server/compare/8.14\...main[View commits]
[float]
==== Added

<<<<<<< HEAD
- Add `elasticsearch.flushed.uncompressed.bytes` metric {pull}13155[13155]
- APM Server now relies on the Elasticsearch apm-data plugin's index templates, removing the requirement to install the APM integration package {pull}12066[12066]
- Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326]
- Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196]
- Add require data stream to bulk index requests {pull}13398[13398]
- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514]
=======
- APM Server now automatically retries document-level 429s from Elasticsearch to avoid dropping data. `output.elasticsearch.max_retries` now controls both request-level and document-level retries, and defaults to `3`. {pull}13620[13620]
>>>>>>> 28436dc03 (output: Retry document-level `429`s by default (#13620))
15 changes: 10 additions & 5 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,19 +723,20 @@ func (s *Runner) newFinalBatchProcessor(
func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) (
docappender.Config, *elasticsearch.Config, error,
) {
var esConfig struct {
esConfig := struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}{
// Default to 1mib flushes, which is the default for go-docappender.
FlushBytes: "1 mib",
FlushInterval: time.Second,
Config: elasticsearch.DefaultConfig(),
}
// Default to 1mib flushes, which is the default for go-docappender.
esConfig.FlushBytes = "1 mib"
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
esConfig.MaxIdleConnsPerHost = 10

if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
Expand Down Expand Up @@ -768,6 +769,10 @@ func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) (
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
RequireDataStream: true,
// Use the output's max_retries to configure the go-docappender's
// document level retries.
MaxDocumentRetries: esConfig.MaxRetries,
RetryOnDocumentStatus: []int{429}, // Only retry "safe" 429 responses.
}, memLimit, s.logger)
if cfg.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = cfg.MaxRequests
Expand Down
32 changes: 18 additions & 14 deletions internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,15 @@ func TestRunnerNewDocappenderConfig(t *testing.T) {
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: time.Second,
FlushBytes: 1024 * 1024,
MaxRequests: c.wantMaxRequests,
DocumentBufferSize: c.wantDocBufSize,
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: time.Second,
FlushBytes: 1024 * 1024,
MaxRequests: c.wantMaxRequests,
DocumentBufferSize: c.wantDocBufSize,
MaxDocumentRetries: 3,
RetryOnDocumentStatus: []int{429},
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Expand All @@ -207,13 +209,15 @@ func TestRunnerNewDocappenderConfig(t *testing.T) {
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: 2 * time.Second,
FlushBytes: 500 * 1024,
MaxRequests: 50,
DocumentBufferSize: c.wantDocBufSize,
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: 2 * time.Second,
FlushBytes: 500 * 1024,
MaxRequests: 50,
DocumentBufferSize: c.wantDocBufSize,
MaxDocumentRetries: 3,
RetryOnDocumentStatus: []int{429},
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Expand Down

0 comments on commit e419b95

Please sign in to comment.