Skip to content

Commit

Permalink
Merge pull request #3176 from wangyang0616/feature_usage_for_custom_m…
Browse files Browse the repository at this point in the history
…etrics

Obtains the actual load data of a node from the custom metrics API.
  • Loading branch information
volcano-sh-bot authored Nov 7, 2023
2 parents ffa5261 + f1a6d19 commit ea04e65
Show file tree
Hide file tree
Showing 13 changed files with 1,012 additions and 181 deletions.
161 changes: 132 additions & 29 deletions docs/design/usage-based-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Currently the pod is scheduled based on the resource request and node allocatabl
A separated goroutine is created in scheduler cache to talk with Metrics source(like prometheus, elasticsearch) which is used to collect and aggregate node usage metrics. The node usage data in cache is consumed by usage based scheduling plugin and other plugins like rescheduling plugin. The struct is as below.
```
type NodeUsage struct {
MetricsTime time.Time
cpuUsageAvg map[string]float64
memUsageAvg map[string]float64
}
Expand All @@ -45,10 +46,14 @@ tiers:
- name: gang
- name: conformance
- name: usage # usage based scheduling plugin
enablePredicate: false # If the value is false, new pod scheduling is not disabled when the node load reaches the threshold. If the value is true or left blank, new pod scheduling is disabled.
arguments:
usage.weight: 5
cpu.weight: 1
memory.weight: 1
thresholds:
CPUUsageAvg.5m: 90 # The node whose average usage in 5 minute is higher than 90% will be filtered in predicating stage
MEMUsageAvg.5m: 80 # The node whose average usage in 5 minute is higher than 80% will be filtered in predicating stage
cpu: 80 # The actual CPU load of a node reaches 80%, and the node cannot schedule new pods.
mem: 70 # The actual Memory load of a node reaches 70%, and the node cannot schedule new pods.
- plugins:
- name: overcommit
- name: drf
Expand All @@ -57,9 +62,9 @@ tiers:
- name: nodeorder
- name: binpack
metrics: # metrics server related configuration
type: prometheus # Optional, The metrics source type, prometheus by default, support prometheus and elasticsearch
type: prometheus # Optional, The metrics source type, prometheus by default, support "prometheus", "prometheus_adapt" and "elasticsearch"
address: http://192.168.0.10:9090 # Mandatory, The metrics source address
interval: 30s # Optional, The scheduler pull metrics from Prometheus with this interval, 5s by default
interval: 30s # Optional, The scheduler pull metrics from Prometheus with this interval, 30s by default
tls: # Optional, The tls configuration
insecureSkipVerify: "false" # Optional, Skip the certificate verification, false by default
elasticsearch: # Optional, The elasticsearch configuration
Expand Down Expand Up @@ -92,31 +97,129 @@ Finally, there should a model to balance multiple factors with weight and calcul
| ... | ... | ... |
| | | |

### Configuration and usage of different monitoring systems
The monitoring data of Volcano usage can be obtained from "Prometheus", "Custom Metrics API" and "Eleasticsearch", where the corresponding type of "Custom Metrics Api" is "prometheus_adapt".

### Prometheus rule configuration
The node-exporter is used to monitor the node real-time usage, from which the Prometheus collect the data and aggregate according to the rules. Following Prometheus rules are needed to configured as a example in order to get cpu_usage_avg_5m,cpu_usage_max_avg_1h,cpu_usage_max_avg_1d,mem_usage_avg_5m,mem_usage_max _avg_1h,mem_usage_max_avg_1d etc.
**It is recommended to use the Custom Metrics API mode, and the monitoring indicators come from Prometheus Adapt.**

#### Custom Metrics API
Ensure that Prometheus Adaptor is properly installed in the cluster and the custom metrics API is available.
Set the user-defined indicator information. The rules to be added are as follows. For details, see [Metrics Discovery and Presentation Configuration](https://github.com/kubernetes-sigs/prometheus-adapter/blob/master/docs/config.md#metrics-discovery-and-presentation-configuration)
```
rules:
- seriesQuery: '{__name__=~"node_cpu_seconds_total"}'
resources:
overrides:
instance:
resource: node
name:
matches: "node_cpu_seconds_total"
as: "node_cpu_usage_avg"
metricsQuery: avg_over_time((1 - avg (irate(<<.Series>>{mode="idle"}[5m])) by (instance))[10m:30s])
- seriesQuery: '{__name__=~"node_memory_MemTotal_bytes"}'
resources:
overrides:
instance:
resource: node
name:
matches: "node_memory_MemTotal_bytes"
as: "node_memory_usage_avg"
metricsQuery: avg_over_time(((1-node_memory_MemAvailable_bytes/<<.Series>>))[10m:30s])
```
Scheduler Configuration:
```
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- name: usage # usage based scheduling plugin
enablePredicate: false # If the value is false, new pod scheduling is not disabled when the node load reaches the threshold. If the value is true or left blank, new pod scheduling is disabled.
arguments:
usage.weight: 5
cpu.weight: 1
memory.weight: 1
thresholds:
cpu: 80 # The actual CPU load of a node reaches 80%, and the node cannot schedule new pods.
mem: 70 # The actual Memory load of a node reaches 70%, and the node cannot schedule new pods.
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
- name: binpack
metrics: # metrics server related configuration
type: prometheus_adaptor # Optional, The metrics source type, prometheus by default, support "prometheus", "prometheus_adaptor" and "elasticsearch"
interval: 30s # Optional, The scheduler pull metrics from Prometheus with this interval, 30s by default
```

#### Prometheus
Scheduler Configuration:
```
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: example-record
spec:
groups:
- name: cpu_mem_usage_active
interval: 30s
rules:
- record: cpu_usage_active
expr: 100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[30s])) * 100)
- record: mem_usage_active
expr: 100*(1-node_memory_MemAvailable_bytes/node_memory_MemTotal_bytes)
- name: cpu-usage-1m
interval: 1m
rules:
- record: cpu_usage_avg_5m
expr: avg_over_time(cpu_usage_active[5m])
- name: mem-usage-1m
interval: 1m
rules:
- record: mem_usage_avg_5m
expr: avg_over_time(mem_usage_active[5m])
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- name: usage # usage based scheduling plugin
enablePredicate: false # If the value is false, new pod scheduling is not disabled when the node load reaches the threshold. If the value is true or left blank, new pod scheduling is disabled.
arguments:
usage.weight: 5
cpu.weight: 1
memory.weight: 1
thresholds:
cpu: 80 # The actual CPU load of a node reaches 80%, and the node cannot schedule new pods.
mem: 70 # The actual Memory load of a node reaches 70%, and the node cannot schedule new pods.
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
- name: binpack
metrics: # metrics server related configuration
type: prometheus # Optional, The metrics source type, prometheus by default, support "prometheus", "prometheus_adaptor" and "elasticsearch"
address: http://192.168.0.10:9090 # Mandatory, The metrics source address
interval: 30s # Optional, The scheduler pull metrics from Prometheus with this interval, 30s by default
```

### Elesticsearch
Scheduler Configuration
```
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- name: usage # usage based scheduling plugin
enablePredicate: false # If the value is false, new pod scheduling is not disabled when the node load reaches the threshold. If the value is true or left blank, new pod scheduling is disabled.
arguments:
usage.weight: 5
cpu.weight: 1
memory.weight: 1
thresholds:
cpu: 80 # The actual CPU load of a node reaches 80%, and the node cannot schedule new pods.
mem: 70 # The actual Memory load of a node reaches 70%, and the node cannot schedule new pods.
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
- name: binpack
metrics: # metrics server related configuration
type: elasticsearch # Optional, The metrics source type, prometheus by default, support "prometheus", "prometheus_adaptor" and "elasticsearch"
address: http://192.168.0.10:9090 # Mandatory, The metrics source address
interval: 30s # Optional, The scheduler pull metrics from Prometheus with this interval, 30s by default
tls: # Optional, The tls configuration
insecureSkipVerify: "false" # Optional, Skip the certificate verification, false by default
elasticsearch: # Optional, The elasticsearch configuration
index: "custom-index-name" # Optional, The elasticsearch index name, "metricbeat-*" by default
username: "" # Optional, The elasticsearch username
password: "" # Optional, The elasticsearch password
hostnameFieldName: "host.hostname" # Optional, The elasticsearch hostname field name, "host.hostname" by default
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ require (
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/kube-scheduler v0.0.0 // indirect
k8s.io/kubelet v0.0.0 // indirect
k8s.io/metrics v0.27.2
k8s.io/mount-utils v0.25.0 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,8 @@ k8s.io/kubelet v0.27.2 h1:vpJnBkqQjxItEhehKG0toXoZ+G+tf4UXAOqtMJy6qgc=
k8s.io/kubelet v0.27.2/go.mod h1:1SVrHaLnuw53nQJx8036k9HjE0teDXZtbN51cYC0HSc=
k8s.io/kubernetes v1.27.2 h1:g4v9oY6u7vBUDEuq4FvC50Bbw2K7GZuvM00IIESWVf4=
k8s.io/kubernetes v1.27.2/go.mod h1:U8ZXeKBAPxeb4J4/HOaxjw1A9K6WfSH+fY2SS7CR6IM=
k8s.io/metrics v0.27.2 h1:TD6z3dhhN9bgg5YkbTh72bPiC1BsxipBLPBWyC3VQAU=
k8s.io/metrics v0.27.2/go.mod h1:v3OT7U0DBvoAzWVzGZWQhdV4qsRJWchzs/LeVN8bhW4=
k8s.io/mount-utils v0.27.2 h1:fEqtBdAv88xpoPr3nR0MgYs6P+2PjXyUTwd4NmqSBjY=
k8s.io/mount-utils v0.27.2/go.mod h1:vmcjYdi2Vg1VTWY7KkhvwJVY6WDHxb/QQhiQKkR8iNs=
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
import (
"fmt"
"strconv"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -109,6 +110,7 @@ type NodeState struct {

// NodeUsage defines the real load usage of node
type NodeUsage struct {
MetricsTime time.Time
CPUUsageAvg map[string]float64
MEMUsageAvg map[string]float64
}
Expand All @@ -118,6 +120,7 @@ func (nu *NodeUsage) DeepCopy() *NodeUsage {
CPUUsageAvg: make(map[string]float64),
MEMUsageAvg: make(map[string]float64),
}
newUsage.MetricsTime = nu.MetricsTime
for k, v := range nu.CPUUsageAvg {
newUsage.CPUUsageAvg[k] = v
}
Expand Down
77 changes: 42 additions & 35 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ import (
)

const (
// default interval for sync data from metrics server, the value is 5s
defaultMetricsInternal = 5000000000
// default interval for sync data from metrics server, the value is 30s
defaultMetricsInternal = 30 * time.Second
)

func init() {
Expand Down Expand Up @@ -709,14 +709,13 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh)

// Get metrics data
address := sc.metricsConf["address"]
if len(address) > 0 {
interval, err := time.ParseDuration(sc.metricsConf["interval"])
if err != nil || interval <= 0 {
interval = time.Duration(defaultMetricsInternal)
}
go wait.Until(sc.GetMetricsData, interval, stopCh)
klog.V(3).Infof("Start metrics collection, metricsConf is %v", sc.metricsConf)
interval, err := time.ParseDuration(sc.metricsConf["interval"])
if err != nil || interval <= 0 {
interval = defaultMetricsInternal
}
klog.V(3).Infof("The interval for querying metrics data is %v", interval)
go wait.Until(sc.GetMetricsData, interval, stopCh)
}

// WaitForCacheSync sync the cache with the api server
Expand Down Expand Up @@ -1311,49 +1310,57 @@ func (sc *SchedulerCache) SetMetricsConf(conf map[string]string) {
}

func (sc *SchedulerCache) GetMetricsData() {
client, err := source.NewMetricsClient(sc.metricsConf)
metricsType := sc.metricsConf["type"]
if len(metricsType) == 0 {
klog.V(3).Infof("The metrics type is not set in the volcano scheduler configmap file. " +
"As a result, the CPU and memory load information of the node is not collected.")
return
}

client, err := source.NewMetricsClient(sc.restConfig, sc.metricsConf)
if err != nil {
klog.Errorf("Error creating client: %v\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
nodeUsageMap := make(map[string]*schedulingapi.NodeUsage)
nodeMetricsMap := make(map[string]*source.NodeMetrics, len(sc.NodeList))
sc.Mutex.Lock()
for k := range sc.Nodes {
nodeUsageMap[k] = &schedulingapi.NodeUsage{
CPUUsageAvg: make(map[string]float64),
MEMUsageAvg: make(map[string]float64),
}

for _, nodeName := range sc.NodeList {
nodeMetricsMap[nodeName] = &source.NodeMetrics{}
}
sc.Mutex.Unlock()

supportedPeriods := []string{"5m"}
for node := range nodeUsageMap {
for _, period := range supportedPeriods {
nodeMetrics, err := client.NodeMetricsAvg(ctx, node, period)
if err != nil {
klog.Errorf("Error getting node metrics: %v\n", err)
continue
}
klog.V(4).Infof("node: %v, CpuUsageAvg: %v, MemUsageAvg: %v, period:%v", node, nodeMetrics.CPU, nodeMetrics.Memory, period)
nodeUsageMap[node].CPUUsageAvg[period] = nodeMetrics.CPU
nodeUsageMap[node].MEMUsageAvg[period] = nodeMetrics.Memory
}
err = client.NodesMetricsAvg(ctx, nodeMetricsMap)
if err != nil {
klog.Errorf("Error getting node metrics: %v\n", err)
return
}
sc.setMetricsData(nodeUsageMap)

sc.setMetricsData(nodeMetricsMap)
}

func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.NodeUsage) {
func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*source.NodeMetrics) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

for k := range usageInfo {
nodeInfo, ok := sc.Nodes[k]
if ok {
klog.V(3).Infof("node: %s, ResourceUsage: %+v => %+v", k, *nodeInfo.ResourceUsage, *usageInfo[k])
nodeInfo.ResourceUsage = usageInfo[k]
for nodeName, nodeMetric := range usageInfo {
nodeUsage := &schedulingapi.NodeUsage{
CPUUsageAvg: make(map[string]float64),
MEMUsageAvg: make(map[string]float64),
}
nodeUsage.MetricsTime = nodeMetric.MetricsTime
nodeUsage.CPUUsageAvg[source.NODE_METRICS_PERIOD] = nodeMetric.CPU
nodeUsage.MEMUsageAvg[source.NODE_METRICS_PERIOD] = nodeMetric.Memory

nodeInfo, ok := sc.Nodes[nodeName]
if !ok {
klog.Errorf("The information about node %s cannot be found in the cache.", nodeName)
continue
}
klog.V(5).Infof("node: %s, ResourceUsage: %+v => %+v", nodeName, *nodeInfo.ResourceUsage, nodeUsage)
nodeInfo.ResourceUsage = nodeUsage
}
}

Expand Down
Loading

0 comments on commit ea04e65

Please sign in to comment.