Skip to content

Commit

Permalink
fix: Update FlushBytes parsing/defaults (#13576) (#13595)
Browse files Browse the repository at this point in the history
Updates the `FlushBytes` setting to default to 1 mib and only override
to 24kb if the user has explicitly set it to 24kb.

Fixes #13024
---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
(cherry picked from commit a453a88)

Co-authored-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
mergify[bot] and marclop authored Jul 5, 2024
1 parent 13fde89 commit 2def5b2
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 49 deletions.
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ https://github.com/elastic/apm-server/compare/8.14\...main[View commits]
- Avoid data race due to reuse of `bytes.Buffer` in ES bulk requests {pull}13155[13155]
- APM Server now relies on the Elasticsearch apm-data plugin's index templates, which reverts some unsafe uses of `flattened` field types {pull}12066[12066]
- Add `error.id` to jaeger errors {pull}13196[13196]
- Fix a performance regression for apm servers which don't specify `output.elasticsearch.flush_bytes` {pull}13576[13576]

[float]
==== Breaking Changes
Expand Down
110 changes: 61 additions & 49 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,6 @@ func (s *Runner) newFinalBatchProcessor(
newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error),
memLimit float64,
) (modelpb.BatchProcessor, func(context.Context) error, error) {

monitoring.Default.Remove("libbeat")
libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat")
if s.elasticsearchOutputConfig == nil {
Expand All @@ -649,59 +648,16 @@ func (s *Runner) newFinalBatchProcessor(
}
monitoring.NewString(outputRegistry, "name").Set("elasticsearch")

var esConfig struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
esConfig.MaxIdleConnsPerHost = 10
if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
// Create the docappender and Elasticsearch config
appenderCfg, esCfg, err := s.newDocappenderConfig(tracer, memLimit)
if err != nil {
return nil, nil, err
}

if esConfig.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = esConfig.MaxRequests
}

var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse flush_bytes: %w", err)
}
flushBytes = int(b)
}
minFlush := 24 * 1024
if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
flushBytes = minFlush
}
client, err := newElasticsearchClient(esConfig.Config)
client, err := newElasticsearchClient(esCfg)
if err != nil {
return nil, nil, err
}
var scalingCfg docappender.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
opts := docappender.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
RequireDataStream: true,
}
opts = docappenderConfig(opts, memLimit, s.logger)
appender, err := docappender.New(client, opts)
appender, err := docappender.New(client, appenderCfg)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -764,6 +720,62 @@ func (s *Runner) newFinalBatchProcessor(
return newDocappenderBatchProcessor(appender), appender.Close, nil
}

func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) (
docappender.Config, *elasticsearch.Config, error,
) {
var esConfig struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}
// Default to 1mib flushes, which is the default for go-docappender.
esConfig.FlushBytes = "1 mib"
esConfig.FlushInterval = time.Second
esConfig.Config = elasticsearch.DefaultConfig()
esConfig.MaxIdleConnsPerHost = 10

if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
return docappender.Config{}, nil, err
}

var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err)
}
flushBytes = int(b)
}
minFlush := 24 * 1024
if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
flushBytes = minFlush
}
var scalingCfg docappender.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
cfg := docappenderConfig(docappender.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
Tracer: tracer,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
RequireDataStream: true,
}, memLimit, s.logger)
if cfg.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = cfg.MaxRequests
}

return cfg, esConfig.Config, nil
}

func docappenderConfig(
opts docappender.Config, memLimit float64, logger *logp.Logger,
) docappender.Config {
Expand Down
75 changes: 75 additions & 0 deletions internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2/apmtest"
"go.uber.org/zap"

"github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/elasticsearch"
agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-docappender/v2"
)

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
Expand Down Expand Up @@ -152,3 +156,74 @@ func newMockClusterUUIDClient(t testing.TB, clusterUUID string) *elasticsearch.C
require.NoError(t, err)
return client
}

func TestRunnerNewDocappenderConfig(t *testing.T) {
var tc = []struct {
memSize float64
wantMaxRequests int
wantDocBufSize int
}{
{memSize: 1, wantMaxRequests: 11, wantDocBufSize: 819},
{memSize: 2, wantMaxRequests: 13, wantDocBufSize: 1638},
{memSize: 4, wantMaxRequests: 16, wantDocBufSize: 3276},
{memSize: 8, wantMaxRequests: 22, wantDocBufSize: 6553},
}
for _, c := range tc {
t.Run(fmt.Sprintf("default/%vgb", c.memSize), func(t *testing.T) {
r := Runner{
elasticsearchOutputConfig: agentconfig.NewConfig(),
logger: logp.NewLogger("test"),
}
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: time.Second,
FlushBytes: 1024 * 1024,
MaxRequests: c.wantMaxRequests,
DocumentBufferSize: c.wantDocBufSize,
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Backoff: elasticsearch.DefaultBackoffConfig,
Protocol: "http",
CompressionLevel: 5,
Timeout: 5 * time.Second,
MaxRetries: 3,
MaxIdleConnsPerHost: c.wantMaxRequests,
}, esCfg)
})
t.Run(fmt.Sprintf("override/%vgb", c.memSize), func(t *testing.T) {
r := Runner{
elasticsearchOutputConfig: agentconfig.MustNewConfigFrom(map[string]interface{}{
"flush_bytes": "500 kib",
"flush_interval": "2s",
"max_requests": 50,
}),
logger: logp.NewLogger("test"),
}
docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize)
require.NoError(t, err)
assert.Equal(t, docappender.Config{
Logger: zap.New(r.logger.Core(), zap.WithCaller(true)),
CompressionLevel: 5,
RequireDataStream: true,
FlushInterval: 2 * time.Second,
FlushBytes: 500 * 1024,
MaxRequests: 50,
DocumentBufferSize: c.wantDocBufSize,
}, docCfg)
assert.Equal(t, &elasticsearch.Config{
Hosts: elasticsearch.Hosts{"localhost:9200"},
Backoff: elasticsearch.DefaultBackoffConfig,
Protocol: "http",
CompressionLevel: 5,
Timeout: 5 * time.Second,
MaxRetries: 3,
MaxIdleConnsPerHost: 50,
}, esCfg)
})
}
}

0 comments on commit 2def5b2

Please sign in to comment.