Skip to content

Commit

Permalink
agent: Add scaling event reporting
Browse files Browse the repository at this point in the history
This is part 2 of 2; see #1078 for the ground work.

In short, this commit:

* Adds a new package: 'pkg/agent/scalingevents'
* Adds new callbacks to core.State to allow it to report on scaling
  events changes in desired CU.
  • Loading branch information
sharnoff committed Oct 17, 2024
1 parent df54b37 commit d2b4d45
Show file tree
Hide file tree
Showing 14 changed files with 480 additions and 19 deletions.
7 changes: 7 additions & 0 deletions autoscaler-agent/config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ data:
"accumulateEverySeconds": 24,
"clients": {}
},
"scalingEvents": {
"cuMultiplier": 0.25,
"rereportThreshold": 0.25,
"clusterName": "replaceme",
"regionName": "replaceme",
"clients": {}
},
"monitor": {
"serverPort": 10301,
"responseTimeoutSeconds": 5,
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ import (
"github.com/tychoish/fun/erc"

"github.com/neondatabase/autoscaling/pkg/agent/billing"
"github.com/neondatabase/autoscaling/pkg/agent/scalingevents"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/reporting"
)

type Config struct {
RefreshStateIntervalSeconds uint `json:"refereshStateIntervalSeconds"`

Billing billing.Config `json:"billing"`
ScalingEvents scalingevents.Config `json:"scalingEvents"`

Scaling ScalingConfig `json:"scaling"`
Metrics MetricsConfig `json:"metrics"`
Scheduler SchedulerConfig `json:"scheduler"`
Monitor MonitorConfig `json:"monitor"`
NeonVM NeonVMConfig `json:"neonvm"`
Billing billing.Config `json:"billing"`
DumpState *DumpStateConfig `json:"dumpState"`
}

Expand Down Expand Up @@ -193,6 +196,18 @@ func (c *Config) validate() error {
erc.Whenf(ec, c.Billing.Clients.S3.Region == "", emptyTmpl, ".billing.clients.s3.region")
erc.Whenf(ec, c.Billing.Clients.S3.PrefixInBucket == "", emptyTmpl, ".billing.clients.s3.prefixInBucket")
}

erc.Whenf(ec, c.ScalingEvents.CUMultiplier == 0, zeroTmpl, ".scalingEvents.cuMultiplier")
erc.Whenf(ec, c.ScalingEvents.RereportThreshold == 0, zeroTmpl, ".scalingEvents.rereportThreshold")
erc.Whenf(ec, c.ScalingEvents.ClusterName == "", emptyTmpl, ".scalingEvents.clusterName")
erc.Whenf(ec, c.ScalingEvents.RegionName == "", emptyTmpl, ".scalingEvents.regionName")
if c.ScalingEvents.Clients.S3 != nil {
validateBaseReportingConfig(&c.ScalingEvents.Clients.S3.BaseClientConfig, "scalingEvents.clients.s3")
erc.Whenf(ec, c.ScalingEvents.Clients.S3.Bucket == "", emptyTmpl, ".scalingEvents.clients.s3.bucket")
erc.Whenf(ec, c.ScalingEvents.Clients.S3.Region == "", emptyTmpl, ".scalingEvents.clients.s3.region")
erc.Whenf(ec, c.ScalingEvents.Clients.S3.PrefixInBucket == "", emptyTmpl, ".scalingEvents.clients.s3.prefixInBucket")
}

erc.Whenf(ec, c.DumpState != nil && c.DumpState.Port == 0, zeroTmpl, ".dumpState.port")
erc.Whenf(ec, c.DumpState != nil && c.DumpState.TimeoutSeconds == 0, zeroTmpl, ".dumpState.timeoutSeconds")

Expand Down
45 changes: 28 additions & 17 deletions pkg/agent/core/goalcu.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/neondatabase/autoscaling/pkg/agent/scalingevents"
"github.com/neondatabase/autoscaling/pkg/api"
)

Expand All @@ -19,6 +20,7 @@ type scalingGoal struct {

func calculateGoalCU(
warn func(string),
report func(goalCU uint32, parts scalingevents.GoalCUComponents),
cfg api.ScalingConfig,
computeUnit api.Resources,
systemMetrics *SystemMetrics,
Expand All @@ -29,30 +31,44 @@ func calculateGoalCU(
warn("Making scaling decision without all required metrics available")
}

var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU uint32
var lfcGoalCU, cpuGoalCU, memGoalCU, memTotalGoalCU float64
var logFields []zap.Field
var reportedGoals scalingevents.GoalCUComponents

var wss *api.Bytes // estimated working set size

if lfcMetrics != nil {
var lfcLogFunc func(zapcore.ObjectEncoder) error
lfcGoalCU, wss, lfcLogFunc = calculateLFCGoalCU(warn, cfg, computeUnit, *lfcMetrics)
reportedGoals.LFC = lo.ToPtr(lfcGoalCU)
if lfcLogFunc != nil {
logFields = append(logFields, zap.Object("lfc", zapcore.ObjectMarshalerFunc(lfcLogFunc)))
}
}

if systemMetrics != nil {
cpuGoalCU = calculateCPUGoalCU(cfg, computeUnit, *systemMetrics)
reportedGoals.CPU = lo.ToPtr(cpuGoalCU)

memGoalCU = calculateMemGoalCU(cfg, computeUnit, *systemMetrics)
reportedGoals.Mem = lo.ToPtr(memGoalCU)
}

if systemMetrics != nil && wss != nil {
memTotalGoalCU = calculateMemTotalGoalCU(cfg, computeUnit, *systemMetrics, *wss)
reportedGoals.Mem = lo.ToPtr(max(*reportedGoals.Mem, memTotalGoalCU))
}

goalCU := max(cpuGoalCU, memGoalCU, memTotalGoalCU, lfcGoalCU)
goalCU := uint32(math.Ceil(max(
math.Round(cpuGoalCU), // for historical compatibility, use round() instead of ceil()
memGoalCU,
memTotalGoalCU,
lfcGoalCU,
)))
if hasAllMetrics {
// Report this information, for scaling metrics.
report(goalCU, reportedGoals)
}

return scalingGoal{hasAllMetrics: hasAllMetrics, goalCU: goalCU}, logFields
}
Expand All @@ -64,10 +80,9 @@ func calculateCPUGoalCU(
cfg api.ScalingConfig,
computeUnit api.Resources,
systemMetrics SystemMetrics,
) uint32 {
) float64 {
goalCPUs := systemMetrics.LoadAverage1Min / *cfg.LoadAverageFractionTarget
cpuGoalCU := uint32(math.Round(goalCPUs / computeUnit.VCPU.AsFloat64()))
return cpuGoalCU
return goalCPUs / computeUnit.VCPU.AsFloat64()
}

// For Mem:
Expand All @@ -78,13 +93,11 @@ func calculateMemGoalCU(
cfg api.ScalingConfig,
computeUnit api.Resources,
systemMetrics SystemMetrics,
) uint32 {
) float64 {
// goal memory size, just looking at allocated memory (not including page cache...)
memGoalBytes := api.Bytes(math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget))
memGoalBytes := math.Round(systemMetrics.MemoryUsageBytes / *cfg.MemoryUsageFractionTarget)

// note: this is equal to ceil(memGoalBytes / computeUnit.Mem), because ceil(X/M) == floor((X+M-1)/M)
memGoalCU := uint32((memGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem)
return memGoalCU
return memGoalBytes / float64(computeUnit.Mem)
}

// goal memory size, looking at allocated memory and min(page cache usage, LFC working set size)
Expand All @@ -93,20 +106,19 @@ func calculateMemTotalGoalCU(
computeUnit api.Resources,
systemMetrics SystemMetrics,
wss api.Bytes,
) uint32 {
) float64 {
lfcCached := min(float64(wss), systemMetrics.MemoryCachedBytes)
totalGoalBytes := api.Bytes((lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget)
totalGoalBytes := (lfcCached + systemMetrics.MemoryUsageBytes) / *cfg.MemoryTotalFractionTarget

memTotalGoalCU := uint32((totalGoalBytes + computeUnit.Mem - 1) / computeUnit.Mem)
return memTotalGoalCU
return totalGoalBytes / float64(computeUnit.Mem)
}

func calculateLFCGoalCU(
warn func(string),
cfg api.ScalingConfig,
computeUnit api.Resources,
lfcMetrics LFCMetrics,
) (uint32, *api.Bytes, func(zapcore.ObjectEncoder) error) {
) (float64, *api.Bytes, func(zapcore.ObjectEncoder) error) {
wssValues := lfcMetrics.ApproximateworkingSetSizeBuckets
// At this point, we can assume that the values are equally spaced at 1 minute apart,
// starting at 1 minute.
Expand Down Expand Up @@ -135,7 +147,6 @@ func calculateLFCGoalCU(
requiredMem := estimateWssMem / *cfg.LFCToMemoryRatio
// ... and then convert that into the actual CU required to fit the working set:
requiredCU := requiredMem / computeUnit.Mem.AsFloat64()
lfcGoalCU := uint32(math.Ceil(requiredCU))

lfcLogFields := func(obj zapcore.ObjectEncoder) error {
obj.AddFloat64("estimateWssPages", estimateWss)
Expand All @@ -144,6 +155,6 @@ func calculateLFCGoalCU(
return nil
}

return lfcGoalCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields
return requiredCU, lo.ToPtr(api.Bytes(estimateWssMem)), lfcLogFields
}
}
28 changes: 28 additions & 0 deletions pkg/agent/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,22 @@ import (

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/agent/core/revsource"
"github.com/neondatabase/autoscaling/pkg/agent/scalingevents"
"github.com/neondatabase/autoscaling/pkg/api"
)

type ObservabilityCallbacks struct {
PluginLatency revsource.ObserveCallback
MonitorLatency revsource.ObserveCallback
NeonVMLatency revsource.ObserveCallback

ScalingEvent ReportScalingEventCallback
DesiredScaling ReportDesiredScalingCallback
}

type ReportScalingEventCallback func(timestamp time.Time, current uint32, target uint32)
type ReportDesiredScalingCallback func(timestamp time.Time, current uint32, target uint32, parts scalingevents.GoalCUComponents)

type RevisionSource interface {
Next(ts time.Time, flags vmv1.Flag) vmv1.Revision
Observe(moment time.Time, rev vmv1.Revision) error
Expand Down Expand Up @@ -727,8 +734,20 @@ func (s *state) desiredResourcesFromMetricsOrRequestedUpscaling(now time.Time) (
// 2. Cap the goal CU by min/max, etc
// 3. that's it!

reportGoals := func(goalCU uint32, parts scalingevents.GoalCUComponents) {
currentCU, ok := s.VM.Using().DivResources(s.Config.ComputeUnit)
if !ok {
return // skip reporting if the current CU is not right.
}

if report := s.Config.ObservabilityCallbacks.DesiredScaling; report != nil {
report(now, uint32(currentCU), goalCU, parts)
}
}

sg, goalCULogFields := calculateGoalCU(
s.warn,
reportGoals,
s.scalingConfig(),
s.Config.ComputeUnit,
s.Metrics,
Expand Down Expand Up @@ -1220,6 +1239,15 @@ func (s *State) NeonVM() NeonVMHandle {
}

func (h NeonVMHandle) StartingRequest(now time.Time, resources api.Resources) {
if report := h.s.Config.ObservabilityCallbacks.ScalingEvent; report != nil {
currentCU, currentOk := h.s.VM.Using().DivResources(h.s.Config.ComputeUnit)
targetCU, targetOk := resources.DivResources(h.s.Config.ComputeUnit)

if currentOk && targetOk {
report(now, uint32(currentCU), uint32(targetCU))
}
}

// FIXME: add time to ongoing request info (or maybe only in RequestFailed?)
h.s.NeonVM.OngoingRequested = &resources
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
AlwaysMigrate: false,
ScalingEnabled: true,
ScalingConfig: nil,
ReportScalingEvents: false,
ReportDesiredScaling: false,
},
CurrentRevision: nil,
}
Expand Down Expand Up @@ -257,6 +259,8 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
PluginLatency: nil,
MonitorLatency: nil,
NeonVMLatency: nil,
ScalingEvent: nil,
DesiredScaling: nil,
},
}
}
Expand Down Expand Up @@ -342,6 +346,8 @@ var DefaultInitialStateConfig = helpers.InitialStateConfig{
PluginLatency: nil,
MonitorLatency: nil,
NeonVMLatency: nil,
ScalingEvent: nil,
DesiredScaling: nil,
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/core/testhelpers/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func CreateVmInfo(config InitialVmInfoConfig, opts ...VmInfoOpt) api.VmInfo {
AlwaysMigrate: false,
ScalingConfig: nil,
ScalingEnabled: true,
ReportScalingEvents: false,
ReportDesiredScaling: false,
},
CurrentRevision: nil,
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

vmclient "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned"
"github.com/neondatabase/autoscaling/pkg/agent/billing"
"github.com/neondatabase/autoscaling/pkg/agent/scalingevents"
"github.com/neondatabase/autoscaling/pkg/agent/schedwatch"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
Expand Down Expand Up @@ -51,7 +52,13 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}
defer schedTracker.Stop()

globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker)
scalingEventsMetrics := scalingevents.NewPromMetrics()
scalingReporter, err := scalingevents.NewReporter(ctx, logger, &r.Config.ScalingEvents, scalingEventsMetrics)
if err != nil {
return fmt.Errorf("Error creating scaling events reporter: %w", err)
}

globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker, scalingReporter)
watchMetrics.MustRegister(globalPromReg)

logger.Info("Starting billing metrics collector")
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/globalstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

vmapi "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
vmclient "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned"
"github.com/neondatabase/autoscaling/pkg/agent/scalingevents"
"github.com/neondatabase/autoscaling/pkg/agent/schedwatch"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util"
Expand All @@ -40,12 +41,15 @@ type agentState struct {
vmClient *vmclient.Clientset
schedTracker *schedwatch.SchedulerTracker
metrics GlobalMetrics

scalingReporter *scalingevents.Reporter
}

func (r MainRunner) newAgentState(
baseLogger *zap.Logger,
podIP string,
schedTracker *schedwatch.SchedulerTracker,
scalingReporter *scalingevents.Reporter,
) (*agentState, *prometheus.Registry) {
metrics, promReg := makeGlobalMetrics()

Expand All @@ -59,6 +63,8 @@ func (r MainRunner) newAgentState(
podIP: podIP,
schedTracker: schedTracker,
metrics: metrics,

scalingReporter: scalingReporter,
}

return state, promReg
Expand Down
Loading

0 comments on commit d2b4d45

Please sign in to comment.