Skip to content

Commit

Permalink
Add tests to beater memlimit code.
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Aug 13, 2024
1 parent a3940ee commit c914b9a
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 46 deletions.
14 changes: 13 additions & 1 deletion changelogs/8.15.asciidoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
[[apm-release-notes-8.15]]
== APM version 8.15

* <<apm-release-notes-8.15.1>>
* <<apm-release-notes-8.15.0>>

[float]
[[apm-release-notes-8.15.1]]
=== APM version 8.15.1

https://github.com/elastic/apm-server/compare/v8.15.0\...v8.15.1[View commits]

[float]
==== Bug fixes

- Fix memory limit fallback check bytes to GB conversion {pull}13838[13838]

[float]
[[apm-release-notes-8.15.0]]
=== APM version 8.15.0
Expand Down Expand Up @@ -37,4 +50,3 @@ https://github.com/elastic/apm-server/compare/v8.14.3\...v8.15.0[View commits]
- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] {pull}13653[13653] {pull}13691[13691] {pull}13790[13790]
- Add mapping for OpenTelemetry attribute `messaging.destination.name` to derive `service.target` correctly {pull}13472[13472]
- APM Server now automatically retries document-level 429s from Elasticsearch to avoid dropping data. `output.elasticsearch.max_retries` now controls both request-level and document-level retries, and defaults to `3`. {pull}13620[13620]
- Fix memory limit fallback check bytes to GB conversion {pull}13838[13838]
88 changes: 49 additions & 39 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,45 +179,12 @@ func (s *Runner) Run(ctx context.Context) error {
}
}

// Obtain the memory limit for the APM Server process. Certain config
// values will be sized according to the maximum memory set for the server.
var memLimit uint64
if cgroupReader := newCgroupReader(); cgroupReader != nil {
if limit, err := cgroupMemoryLimit(cgroupReader); err != nil {
s.logger.Warn(err)
} else {
memLimit = limit
}
}
if limit, err := systemMemoryLimit(); err != nil {
s.logger.Warn(err)
} else {
var fallback bool
if memLimit <= 0 {
s.logger.Info("no cgroups detected, falling back to total system memory")
fallback = true
}
if memLimit > limit {
s.logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory")
fallback = true
}
if fallback {
// If no cgroup limit is set, return a fraction of the total memory
// to have a margin of safety for other processes. The fraction value
// of 0.625 is used to keep the 80% of the total system memory limit
// to be 50% of the total for calculating the number of decoders.
memLimit = uint64(float64(limit) * 0.625)
}
}
// Convert the memory limit to gigabytes to calculate the config values.
var memLimitGB = float64(memLimit) / 1073741824
if memLimitGB <= 0 {
memLimitGB = 1
s.logger.Infof(
"failed to discover memory limit, default to %0.1fgb of memory",
memLimitGB,
)
}
memLimitGB := processMemoryLimit(
newCgroupReader(),
sysMemoryReaderFunc(systemMemoryLimit),
s.logger,
)

if s.config.MaxConcurrentDecoders == 0 {
s.config.MaxConcurrentDecoders = maxConcurrentDecoders(memLimitGB)
s.logger.Infof("MaxConcurrentDecoders set to %d based on 80 percent of %0.1fgb of memory",
Expand Down Expand Up @@ -1037,6 +1004,49 @@ func queryClusterUUID(ctx context.Context, esClient *elasticsearch.Client) error
return nil
}

// processMemoryLimit obtains the memory limit for the APM Server process. Certain config
// values will be sized according to the maximum memory set for the server.
func processMemoryLimit(cgroups cgroupReader, sys sysMemoryReader, logger *logp.Logger) (memLimitGB float64) {
var memLimit uint64
if cgroups != nil {
if limit, err := cgroupMemoryLimit(cgroups); err != nil {
logger.Warn(err)
} else {
memLimit = limit
}
}
if limit, err := sys.Limit(); err != nil {
logger.Warn(err)
} else {
var fallback bool
if memLimit <= 0 {
logger.Info("no cgroups detected, falling back to total system memory")
fallback = true
}
if memLimit > limit {
logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory")
fallback = true
}
if fallback {
// If no cgroup limit is set, return a fraction of the total memory
// to have a margin of safety for other processes. The fraction value
// of 0.625 is used to keep the 80% of the total system memory limit
// to be 50% of the total for calculating the number of decoders.
memLimit = uint64(float64(limit) * 0.625)
}
}
// Convert the memory limit to gigabytes to calculate the config values.
memLimitGB = float64(memLimit) / (1 << 30)
if memLimitGB <= 0 {
memLimitGB = 1
logger.Infof(
"failed to discover memory limit, default to %0.1fgb of memory",
memLimitGB,
)
}
return
}

type nopProcessingSupporter struct {
}

Expand Down
92 changes: 92 additions & 0 deletions internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -40,6 +41,10 @@ import (
agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2"
"github.com/elastic/go-docappender/v2"
)

Expand Down Expand Up @@ -279,3 +284,90 @@ func TestNewInstrumentation(t *testing.T) {
assert.Equal(t, map[string]string{"k1": "val", "k2": "new val"}, <-labels)
assert.Equal(t, "Bearer secret", auth)
}

func TestProcessMemoryLimit(t *testing.T) {
l := logp.NewLogger("test")
const gb = 1 << 30
for name, testCase := range map[string]struct {
cgroups cgroupReader
sys sysMemoryReader
wantMemLimitGB float64
}{
"LimitErrShouldResultInDefaultLimit": {
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 0, errors.New("test")
}),
wantMemLimitGB: 1,
},
"NilCgroupsShouldResultInScaledSysLimit": {
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
"CgroupsErrShouldResultInScaledSysLimit": {
cgroups: mockCgroupReader{errv: errors.New("test")},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
"CgroupsV1OkLimitShouldResultInCgroupsV1OkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{
Memory: &cgv1.MemorySubsystem{
Mem: cgv1.MemoryData{
Limit: opt.Bytes{Bytes: gb},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 1,
},
"CgroupsV1OverMaxLimitShouldResultInScaledSysLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{
Memory: &cgv1.MemorySubsystem{
Mem: cgv1.MemoryData{
Limit: opt.Bytes{Bytes: 15 * gb},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
"CgroupsV2OkLimitShouldResultInCgroupsV1OkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{
Memory: &cgv2.MemorySubsystem{
Mem: cgv2.MemoryData{
Max: opt.BytesOpt{Bytes: opt.UintWith(gb)},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 1,
},
"CgroupsV2OverMaxLimitShouldResultInScaledSysLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{
Memory: &cgv2.MemorySubsystem{
Mem: cgv2.MemoryData{
Max: opt.BytesOpt{Bytes: opt.UintWith(15 * gb)},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
} {
t.Run(name, func(t *testing.T) {
memLimitGB := processMemoryLimit(testCase.cgroups, testCase.sys, l)
assert.Equal(t, testCase.wantMemLimitGB, memLimitGB)
})
}
}
18 changes: 15 additions & 3 deletions internal/beater/memlimit_cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@ import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

func newCgroupReader() *cgroup.Reader {
// cgroupReader defines a short interface useful for testing purposes
// that provides a way to obtain cgroups process memory limit.
// Implemented by github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup Reader.
type cgroupReader interface {
CgroupsVersion(int) (cgroup.CgroupsVersion, error)
GetV1StatsForProcess(int) (*cgroup.StatsV1, error)
GetV2StatsForProcess(int) (*cgroup.StatsV2, error)
}

func newCgroupReader() cgroupReader {
cgroupOpts := cgroup.ReaderOptions{
RootfsMountpoint: resolve.NewTestResolver(""),
IgnoreRootCgroups: true,
Expand All @@ -37,13 +46,16 @@ func newCgroupReader() *cgroup.Reader {
if isset {
cgroupOpts.CgroupsHierarchyOverride = override
}
reader, _ := cgroup.NewReaderOptions(cgroupOpts)
reader, err := cgroup.NewReaderOptions(cgroupOpts)
if err != nil {
return nil
}
return reader
}

// Returns the cgroup maximum memory if running within a cgroup in GigaBytes,
// otherwise, it returns 0 and an error.
func cgroupMemoryLimit(rdr *cgroup.Reader) (uint64, error) {
func cgroupMemoryLimit(rdr cgroupReader) (uint64, error) {
pid := os.Getpid()
vers, err := rdr.CgroupsVersion(pid)
if err != nil {
Expand Down
95 changes: 95 additions & 0 deletions internal/beater/memlimit_cgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package beater

import (
"errors"
"testing"

"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2"
"github.com/stretchr/testify/assert"
)

func TestCgroupMemoryLimit(t *testing.T) {
err := errors.New("test")
for name, testCase := range map[string]struct {
cgroups cgroupReader
wantErr bool
wantLimit uint64
}{
"CgroupsVersionErrShouldResultInError": {
cgroups: mockCgroupReader{errv: err},
wantErr: true,
},
"CgroupsInvalidVersionShouldResultInError": {
cgroups: mockCgroupReader{v: -1},
wantErr: true,
},
"CgroupsV1ErrShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, errv1: err},
wantErr: true,
},
"CgroupsV1NilLimitShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{}},
wantErr: true,
},
"CgroupsV1OkLimitShouldResultInOkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{
Memory: &cgv1.MemorySubsystem{
Mem: cgv1.MemoryData{
Limit: opt.Bytes{Bytes: 1000},
},
},
}},
wantLimit: 1000,
},
"CgroupsV2ErrShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, errv2: err},
wantErr: true,
},
"CgroupsV2NilLimitShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{}},
wantErr: true,
},
"CgroupsV2OkLimitShouldResultInOkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{
Memory: &cgv2.MemorySubsystem{
Mem: cgv2.MemoryData{
Max: opt.BytesOpt{Bytes: opt.UintWith(1000)},
},
},
}},
wantLimit: 1000,
},
} {
t.Run(name, func(t *testing.T) {
limit, err := cgroupMemoryLimit(testCase.cgroups)
if testCase.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, testCase.wantLimit, limit)
})
}
}

type mockCgroupReader struct {
v cgroup.CgroupsVersion
v1 *cgroup.StatsV1
v2 *cgroup.StatsV2
errv, errv1, errv2 error
}

func (r mockCgroupReader) CgroupsVersion(int) (cgroup.CgroupsVersion, error) {
return r.v, r.errv
}

func (r mockCgroupReader) GetV1StatsForProcess(int) (*cgroup.StatsV1, error) {
return r.v1, r.errv1
}

func (r mockCgroupReader) GetV2StatsForProcess(int) (*cgroup.StatsV2, error) {
return r.v2, r.errv2
}
17 changes: 14 additions & 3 deletions internal/beater/memlimit_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@

package beater

import (
"github.com/elastic/go-sysinfo"
)
import "github.com/elastic/go-sysinfo"

// sysMemoryReader defines an interface useful for testing purposes
// that provides a way to obtain the total system memory limit.
type sysMemoryReader interface {
Limit() (uint64, error)
}

// sysMemoryReaderFunc func implementation of sysMemoryReader.
type sysMemoryReaderFunc func() (uint64, error)

func (f sysMemoryReaderFunc) Limit() (uint64, error) {
return f()
}

// systemMemoryLimit returns the total system memory.
func systemMemoryLimit() (uint64, error) {
Expand Down

0 comments on commit c914b9a

Please sign in to comment.