Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory limit check bytes to GB conversion #13838

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 49 additions & 37 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,43 +179,12 @@ func (s *Runner) Run(ctx context.Context) error {
}
}

// Obtain the memory limit for the APM Server process. Certain config
// values will be sized according to the maximum memory set for the server.
var memLimitGB float64
if cgroupReader := newCgroupReader(); cgroupReader != nil {
if limit, err := cgroupMemoryLimit(cgroupReader); err != nil {
s.logger.Warn(err)
} else {
memLimitGB = float64(limit) / 1024 / 1024 / 1024
}
}
if limit, err := systemMemoryLimit(); err != nil {
s.logger.Warn(err)
} else {
var fallback bool
if memLimitGB <= 0 {
s.logger.Info("no cgroups detected, falling back to total system memory")
fallback = true
}
if memLimitGB > float64(limit) {
s.logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory")
fallback = true
}
if fallback {
// If no cgroup limit is set, return a fraction of the total memory
// to have a margin of safety for other processes. The fraction value
// of 0.625 is used to keep the 80% of the total system memory limit
// to be 50% of the total for calculating the number of decoders.
memLimitGB = float64(limit) / 1024 / 1024 / 1024 * 0.625
}
}
if memLimitGB <= 0 {
memLimitGB = 1
s.logger.Infof(
"failed to discover memory limit, default to %0.1fgb of memory",
memLimitGB,
)
}
memLimitGB := processMemoryLimit(
newCgroupReader(),
sysMemoryReaderFunc(systemMemoryLimit),
s.logger,
)

if s.config.MaxConcurrentDecoders == 0 {
s.config.MaxConcurrentDecoders = maxConcurrentDecoders(memLimitGB)
s.logger.Infof("MaxConcurrentDecoders set to %d based on 80 percent of %0.1fgb of memory",
Expand Down Expand Up @@ -1035,6 +1004,49 @@ func queryClusterUUID(ctx context.Context, esClient *elasticsearch.Client) error
return nil
}

// processMemoryLimit obtains the memory limit for the APM Server process. Certain config
// values will be sized according to the maximum memory set for the server.
func processMemoryLimit(cgroups cgroupReader, sys sysMemoryReader, logger *logp.Logger) (memLimitGB float64) {
var memLimit uint64
if cgroups != nil {
if limit, err := cgroupMemoryLimit(cgroups); err != nil {
logger.Warn(err)
} else {
memLimit = limit
}
}
if limit, err := sys.Limit(); err != nil {
logger.Warn(err)
} else {
var fallback bool
if memLimit <= 0 {
logger.Info("no cgroups detected, falling back to total system memory")
fallback = true
}
if memLimit > limit {
logger.Info("cgroup memory limit exceed available memory, falling back to the total system memory")
fallback = true
}
if fallback {
// If no cgroup limit is set, return a fraction of the total memory
// to have a margin of safety for other processes. The fraction value
// of 0.625 is used to keep the 80% of the total system memory limit
// to be 50% of the total for calculating the number of decoders.
memLimit = uint64(float64(limit) * 0.625)
}
}
// Convert the memory limit to gigabytes to calculate the config values.
memLimitGB = float64(memLimit) / (1 << 30)
if memLimitGB <= 0 {
memLimitGB = 1
logger.Infof(
"failed to discover memory limit, default to %0.1fgb of memory",
memLimitGB,
)
}
return
}

type nopProcessingSupporter struct {
}

Expand Down
92 changes: 92 additions & 0 deletions internal/beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -40,6 +41,10 @@ import (
agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2"
"github.com/elastic/go-docappender/v2"
)

Expand Down Expand Up @@ -279,3 +284,90 @@ func TestNewInstrumentation(t *testing.T) {
assert.Equal(t, map[string]string{"k1": "val", "k2": "new val"}, <-labels)
assert.Equal(t, "Bearer secret", auth)
}

func TestProcessMemoryLimit(t *testing.T) {
l := logp.NewLogger("test")
const gb = 1 << 30
for name, testCase := range map[string]struct {
cgroups cgroupReader
sys sysMemoryReader
wantMemLimitGB float64
}{
"LimitErrShouldResultInDefaultLimit": {
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 0, errors.New("test")
}),
wantMemLimitGB: 1,
},
"NilCgroupsShouldResultInScaledSysLimit": {
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
"CgroupsErrShouldResultInScaledSysLimit": {
cgroups: mockCgroupReader{errv: errors.New("test")},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
"CgroupsV1OkLimitShouldResultInCgroupsV1OkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{
Memory: &cgv1.MemorySubsystem{
Mem: cgv1.MemoryData{
Limit: opt.Bytes{Bytes: gb},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 1,
},
"CgroupsV1OverMaxLimitShouldResultInScaledSysLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{
Memory: &cgv1.MemorySubsystem{
Mem: cgv1.MemoryData{
Limit: opt.Bytes{Bytes: 15 * gb},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
"CgroupsV2OkLimitShouldResultInCgroupsV1OkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{
Memory: &cgv2.MemorySubsystem{
Mem: cgv2.MemoryData{
Max: opt.BytesOpt{Bytes: opt.UintWith(gb)},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 1,
},
"CgroupsV2OverMaxLimitShouldResultInScaledSysLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{
Memory: &cgv2.MemorySubsystem{
Mem: cgv2.MemoryData{
Max: opt.BytesOpt{Bytes: opt.UintWith(15 * gb)},
},
},
}},
sys: sysMemoryReaderFunc(func() (uint64, error) {
return 10 * gb, nil
}),
wantMemLimitGB: 6.25,
},
} {
t.Run(name, func(t *testing.T) {
memLimitGB := processMemoryLimit(testCase.cgroups, testCase.sys, l)
assert.Equal(t, testCase.wantMemLimitGB, memLimitGB)
})
}
}
18 changes: 15 additions & 3 deletions internal/beater/memlimit_cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@ import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

func newCgroupReader() *cgroup.Reader {
// cgroupReader defines a short interface useful for testing purposes
// that provides a way to obtain cgroups process memory limit.
// Implemented by github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup Reader.
type cgroupReader interface {
CgroupsVersion(int) (cgroup.CgroupsVersion, error)
GetV1StatsForProcess(int) (*cgroup.StatsV1, error)
GetV2StatsForProcess(int) (*cgroup.StatsV2, error)
}

func newCgroupReader() cgroupReader {
cgroupOpts := cgroup.ReaderOptions{
RootfsMountpoint: resolve.NewTestResolver(""),
IgnoreRootCgroups: true,
Expand All @@ -37,13 +46,16 @@ func newCgroupReader() *cgroup.Reader {
if isset {
cgroupOpts.CgroupsHierarchyOverride = override
}
reader, _ := cgroup.NewReaderOptions(cgroupOpts)
reader, err := cgroup.NewReaderOptions(cgroupOpts)
if err != nil {
return nil
}
return reader
}

// Returns the cgroup maximum memory if running within a cgroup in GigaBytes,
// otherwise, it returns 0 and an error.
func cgroupMemoryLimit(rdr *cgroup.Reader) (uint64, error) {
func cgroupMemoryLimit(rdr cgroupReader) (uint64, error) {
pid := os.Getpid()
vers, err := rdr.CgroupsVersion(pid)
if err != nil {
Expand Down
113 changes: 113 additions & 0 deletions internal/beater/memlimit_cgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"errors"
"testing"

"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv1"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgv2"

"github.com/stretchr/testify/assert"
)

func TestCgroupMemoryLimit(t *testing.T) {
err := errors.New("test")
for name, testCase := range map[string]struct {
cgroups cgroupReader
wantErr bool
wantLimit uint64
}{
"CgroupsVersionErrShouldResultInError": {
cgroups: mockCgroupReader{errv: err},
wantErr: true,
},
"CgroupsInvalidVersionShouldResultInError": {
cgroups: mockCgroupReader{v: -1},
wantErr: true,
},
"CgroupsV1ErrShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, errv1: err},
wantErr: true,
},
"CgroupsV1NilLimitShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{}},
wantErr: true,
},
"CgroupsV1OkLimitShouldResultInOkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV1, v1: &cgroup.StatsV1{
Memory: &cgv1.MemorySubsystem{
Mem: cgv1.MemoryData{
Limit: opt.Bytes{Bytes: 1000},
},
},
}},
wantLimit: 1000,
},
"CgroupsV2ErrShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, errv2: err},
wantErr: true,
},
"CgroupsV2NilLimitShouldResultInError": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{}},
wantErr: true,
},
"CgroupsV2OkLimitShouldResultInOkLimit": {
cgroups: mockCgroupReader{v: cgroup.CgroupsV2, v2: &cgroup.StatsV2{
Memory: &cgv2.MemorySubsystem{
Mem: cgv2.MemoryData{
Max: opt.BytesOpt{Bytes: opt.UintWith(1000)},
},
},
}},
wantLimit: 1000,
},
} {
t.Run(name, func(t *testing.T) {
limit, err := cgroupMemoryLimit(testCase.cgroups)
if testCase.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, testCase.wantLimit, limit)
})
}
}

type mockCgroupReader struct {
v cgroup.CgroupsVersion
v1 *cgroup.StatsV1
v2 *cgroup.StatsV2
errv, errv1, errv2 error
}

func (r mockCgroupReader) CgroupsVersion(int) (cgroup.CgroupsVersion, error) {
return r.v, r.errv
}

func (r mockCgroupReader) GetV1StatsForProcess(int) (*cgroup.StatsV1, error) {
return r.v1, r.errv1
}

func (r mockCgroupReader) GetV2StatsForProcess(int) (*cgroup.StatsV2, error) {
return r.v2, r.errv2
}
17 changes: 14 additions & 3 deletions internal/beater/memlimit_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@

package beater

import (
"github.com/elastic/go-sysinfo"
)
import "github.com/elastic/go-sysinfo"

// sysMemoryReader defines an interface useful for testing purposes
// that provides a way to obtain the total system memory limit.
type sysMemoryReader interface {
Limit() (uint64, error)
}

// sysMemoryReaderFunc func implementation of sysMemoryReader.
type sysMemoryReaderFunc func() (uint64, error)

func (f sysMemoryReaderFunc) Limit() (uint64, error) {
return f()
}

// systemMemoryLimit returns the total system memory.
func systemMemoryLimit() (uint64, error) {
Expand Down