From c914b9aa967469547c8537767716c99a9fd9bbf3 Mon Sep 17 00:00:00 2001 From: Kostiantyn Masliuk <1pkg@protonmail.com> Date: Mon, 12 Aug 2024 17:29:06 -0700 Subject: [PATCH] Add tests to beater memlimit code. --- changelogs/8.15.asciidoc | 14 +++- internal/beater/beater.go | 88 +++++++++++++---------- internal/beater/beater_test.go | 92 ++++++++++++++++++++++++ internal/beater/memlimit_cgroup.go | 18 ++++- internal/beater/memlimit_cgroup_test.go | 95 +++++++++++++++++++++++++ internal/beater/memlimit_system.go | 17 ++++- 6 files changed, 278 insertions(+), 46 deletions(-) create mode 100644 internal/beater/memlimit_cgroup_test.go diff --git a/changelogs/8.15.asciidoc b/changelogs/8.15.asciidoc index 94105d517b4..93b2fedc5d1 100644 --- a/changelogs/8.15.asciidoc +++ b/changelogs/8.15.asciidoc @@ -1,7 +1,20 @@ [[apm-release-notes-8.15]] == APM version 8.15 + +* <> * <> +[float] +[[apm-release-notes-8.15.1]] +=== APM version 8.15.1 + +https://github.com/elastic/apm-server/compare/v8.15.0\...v8.15.1[View commits] + +[float] +==== Bug fixes + +- Fix memory limit fallback check bytes to GB conversion {pull}13838[13838] + [float] [[apm-release-notes-8.15.0]] === APM version 8.15.0 @@ -37,4 +50,3 @@ https://github.com/elastic/apm-server/compare/v8.14.3\...v8.15.0[View commits] - Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] {pull}13653[13653] {pull}13691[13691] {pull}13790[13790] - Add mapping for OpenTelemetry attribute `messaging.destination.name` to derive `service.target` correctly {pull}13472[13472] - APM Server now automatically retries document-level 429s from Elasticsearch to avoid dropping data. `output.elasticsearch.max_retries` now controls both request-level and document-level retries, and defaults to `3`. {pull}13620[13620] -- Fix memory limit fallback check bytes to GB conversion {pull}13838[13838] diff --git a/internal/beater/beater.go b/internal/beater/beater.go index d5b4abade6a..05549a5a45a 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -179,45 +179,12 @@ func (s *Runner) Run(ctx context.Context) error { } } - // Obtain the memory limit for the APM Server process. Certain config - // values will be sized according to the maximum memory set for the server. - var memLimit uint64 - if cgroupReader := newCgroupReader(); cgroupReader != nil { - if limit, err := cgroupMemoryLimit(cgroupReader); err != nil { - s.logger.Warn(err) - } else { - memLimit = limit - } - } - if limit, err := systemMemoryLimit(); err != nil { - s.logger.Warn(err) - } else { - var fallback bool - if memLimit <= 0 { - s.logger.Info("no cgroups detected, falling back to total system memory") - fallback = true - } - if memLimit > limit { - s.logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory") - fallback = true - } - if fallback { - // If no cgroup limit is set, return a fraction of the total memory - // to have a margin of safety for other processes. The fraction value - // of 0.625 is used to keep the 80% of the total system memory limit - // to be 50% of the total for calculating the number of decoders. - memLimit = uint64(float64(limit) * 0.625) - } - } - // Convert the memory limit to gigabytes to calculate the config values. - var memLimitGB = float64(memLimit) / 1073741824 - if memLimitGB <= 0 { - memLimitGB = 1 - s.logger.Infof( - "failed to discover memory limit, default to %0.1fgb of memory", - memLimitGB, - ) - } + memLimitGB := processMemoryLimit( + newCgroupReader(), + sysMemoryReaderFunc(systemMemoryLimit), + s.logger, + ) + if s.config.MaxConcurrentDecoders == 0 { s.config.MaxConcurrentDecoders = maxConcurrentDecoders(memLimitGB) s.logger.Infof("MaxConcurrentDecoders set to %d based on 80 percent of %0.1fgb of memory", @@ -1037,6 +1004,49 @@ func queryClusterUUID(ctx context.Context, esClient *elasticsearch.Client) error return nil } +// processMemoryLimit obtains the memory limit for the APM Server process. Certain config +// values will be sized according to the maximum memory set for the server. +func processMemoryLimit(cgroups cgroupReader, sys sysMemoryReader, logger *logp.Logger) (memLimitGB float64) { + var memLimit uint64 + if cgroups != nil { + if limit, err := cgroupMemoryLimit(cgroups); err != nil { + logger.Warn(err) + } else { + memLimit = limit + } + } + if limit, err := sys.Limit(); err != nil { + logger.Warn(err) + } else { + var fallback bool + if memLimit <= 0 { + logger.Info("no cgroups detected, falling back to total system memory") + fallback = true + } + if memLimit > limit { + logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory") + fallback = true + } + if fallback { + // If no cgroup limit is set, return a fraction of the total memory + // to have a margin of safety for other processes. The fraction value + // of 0.625 is used to keep the 80% of the total system memory limit + // to be 50% of the total for calculating the number of decoders. + memLimit = uint64(float64(limit) * 0.625) + } + } + // Convert the memory limit to gigabytes to calculate the config values. + memLimitGB = float64(memLimit) / (1 << 30) + if memLimitGB <= 0 { + memLimitGB = 1 + logger.Infof( + "failed to discover memory limit, default to %0.1fgb of memory", + memLimitGB, + ) + } + return +} + type nopProcessingSupporter struct { } diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index 4fec5b897f8..289ef975303 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -22,6 +22,7 @@ import ( "context" "encoding/json" "encoding/pem" + "errors" "fmt" "net/http" "net/http/httptest" @@ -40,6 +41,10 @@ import ( agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/opt" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2" "github.com/elastic/go-docappender/v2" ) @@ -279,3 +284,90 @@ func TestNewInstrumentation(t *testing.T) { assert.Equal(t, map[string]string{"k1": "val", "k2": "new val"}, <-labels) assert.Equal(t, "Bearer secret", auth) } + +func TestProcessMemoryLimit(t *testing.T) { + l := logp.NewLogger("test") + const gb = 1 << 30 + for name, testCase := range map[string]struct { + cgroups cgroupReader + sys sysMemoryReader + wantMemLimitGB float64 + }{ + "LimitErrShouldResultInDefaultLimit": { + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 0, errors.New("test") + }), + wantMemLimitGB: 1, + }, + "NilCgroupsShouldResultInScaledSysLimit": { + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + "CgroupsErrShouldResultInScaledSysLimit": { + cgroups: mockCgroupReader{errv: errors.New("test")}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + "CgroupsV1OkLimitShouldResultInCgroupsV1OkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{ + Memory: &cgv1.MemorySubsystem{ + Mem: cgv1.MemoryData{ + Limit: opt.Bytes{Bytes: gb}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 1, + }, + "CgroupsV1OverMaxLimitShouldResultInScaledSysLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{ + Memory: &cgv1.MemorySubsystem{ + Mem: cgv1.MemoryData{ + Limit: opt.Bytes{Bytes: 15 * gb}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + "CgroupsV2OkLimitShouldResultInCgroupsV1OkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{ + Memory: &cgv2.MemorySubsystem{ + Mem: cgv2.MemoryData{ + Max: opt.BytesOpt{Bytes: opt.UintWith(gb)}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 1, + }, + "CgroupsV2OverMaxLimitShouldResultInScaledSysLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{ + Memory: &cgv2.MemorySubsystem{ + Mem: cgv2.MemoryData{ + Max: opt.BytesOpt{Bytes: opt.UintWith(15 * gb)}, + }, + }, + }}, + sys: sysMemoryReaderFunc(func() (uint64, error) { + return 10 * gb, nil + }), + wantMemLimitGB: 6.25, + }, + } { + t.Run(name, func(t *testing.T) { + memLimitGB := processMemoryLimit(testCase.cgroups, testCase.sys, l) + assert.Equal(t, testCase.wantMemLimitGB, memLimitGB) + }) + } +} diff --git a/internal/beater/memlimit_cgroup.go b/internal/beater/memlimit_cgroup.go index 577f5c24025..08de74363e8 100644 --- a/internal/beater/memlimit_cgroup.go +++ b/internal/beater/memlimit_cgroup.go @@ -27,7 +27,16 @@ import ( "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" ) -func newCgroupReader() *cgroup.Reader { +// cgroupReader defines a short interface useful for testing purposes +// that provides a way to obtain cgroups process memory limit. +// Implemented by github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup Reader. +type cgroupReader interface { + CgroupsVersion(int) (cgroup.CgroupsVersion, error) + GetV1StatsForProcess(int) (*cgroup.StatsV1, error) + GetV2StatsForProcess(int) (*cgroup.StatsV2, error) +} + +func newCgroupReader() cgroupReader { cgroupOpts := cgroup.ReaderOptions{ RootfsMountpoint: resolve.NewTestResolver(""), IgnoreRootCgroups: true, @@ -37,13 +46,16 @@ func newCgroupReader() *cgroup.Reader { if isset { cgroupOpts.CgroupsHierarchyOverride = override } - reader, _ := cgroup.NewReaderOptions(cgroupOpts) + reader, err := cgroup.NewReaderOptions(cgroupOpts) + if err != nil { + return nil + } return reader } // Returns the cgroup maximum memory if running within a cgroup in GigaBytes, // otherwise, it returns 0 and an error. -func cgroupMemoryLimit(rdr *cgroup.Reader) (uint64, error) { +func cgroupMemoryLimit(rdr cgroupReader) (uint64, error) { pid := os.Getpid() vers, err := rdr.CgroupsVersion(pid) if err != nil { diff --git a/internal/beater/memlimit_cgroup_test.go b/internal/beater/memlimit_cgroup_test.go new file mode 100644 index 00000000000..c0819c82a5c --- /dev/null +++ b/internal/beater/memlimit_cgroup_test.go @@ -0,0 +1,95 @@ +package beater + +import ( + "errors" + "testing" + + "github.com/elastic/elastic-agent-libs/opt" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1" + "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2" + "github.com/stretchr/testify/assert" +) + +func TestCgroupMemoryLimit(t *testing.T) { + err := errors.New("test") + for name, testCase := range map[string]struct { + cgroups cgroupReader + wantErr bool + wantLimit uint64 + }{ + "CgroupsVersionErrShouldResultInError": { + cgroups: mockCgroupReader{errv: err}, + wantErr: true, + }, + "CgroupsInvalidVersionShouldResultInError": { + cgroups: mockCgroupReader{v: -1}, + wantErr: true, + }, + "CgroupsV1ErrShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, errv1: err}, + wantErr: true, + }, + "CgroupsV1NilLimitShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{}}, + wantErr: true, + }, + "CgroupsV1OkLimitShouldResultInOkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{ + Memory: &cgv1.MemorySubsystem{ + Mem: cgv1.MemoryData{ + Limit: opt.Bytes{Bytes: 1000}, + }, + }, + }}, + wantLimit: 1000, + }, + "CgroupsV2ErrShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, errv2: err}, + wantErr: true, + }, + "CgroupsV2NilLimitShouldResultInError": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{}}, + wantErr: true, + }, + "CgroupsV2OkLimitShouldResultInOkLimit": { + cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{ + Memory: &cgv2.MemorySubsystem{ + Mem: cgv2.MemoryData{ + Max: opt.BytesOpt{Bytes: opt.UintWith(1000)}, + }, + }, + }}, + wantLimit: 1000, + }, + } { + t.Run(name, func(t *testing.T) { + limit, err := cgroupMemoryLimit(testCase.cgroups) + if testCase.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, testCase.wantLimit, limit) + }) + } +} + +type mockCgroupReader struct { + v cgroup.CgroupsVersion + v1 *cgroup.StatsV1 + v2 *cgroup.StatsV2 + errv, errv1, errv2 error +} + +func (r mockCgroupReader) CgroupsVersion(int) (cgroup.CgroupsVersion, error) { + return r.v, r.errv +} + +func (r mockCgroupReader) GetV1StatsForProcess(int) (*cgroup.StatsV1, error) { + return r.v1, r.errv1 +} + +func (r mockCgroupReader) GetV2StatsForProcess(int) (*cgroup.StatsV2, error) { + return r.v2, r.errv2 +} diff --git a/internal/beater/memlimit_system.go b/internal/beater/memlimit_system.go index abb2f8e2036..b2c02d3ba52 100644 --- a/internal/beater/memlimit_system.go +++ b/internal/beater/memlimit_system.go @@ -17,9 +17,20 @@ package beater -import ( - "github.com/elastic/go-sysinfo" -) +import "github.com/elastic/go-sysinfo" + +// sysMemoryReader defines an interface useful for testing purposes +// that provides a way to obtain the total system memory limit. +type sysMemoryReader interface { + Limit() (uint64, error) +} + +// sysMemoryReaderFunc func implementation of sysMemoryReader. +type sysMemoryReaderFunc func() (uint64, error) + +func (f sysMemoryReaderFunc) Limit() (uint64, error) { + return f() +} // systemMemoryLimit returns the total system memory. func systemMemoryLimit() (uint64, error) {