Skip to content

Commit

Permalink
Merge pull request kubernetes#7530 from towca/jtuznik/dra-actual
Browse files Browse the repository at this point in the history
CA: DRA integration MVP
  • Loading branch information
k8s-ci-robot authored Dec 20, 2024
2 parents 10df587 + 4a89524 commit 50c6590
Show file tree
Hide file tree
Showing 42 changed files with 6,780 additions and 274 deletions.
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ func (tcp *TestCloudProvider) SetResourceLimiter(resourceLimiter *cloudprovider.
tcp.resourceLimiter = resourceLimiter
}

// SetMachineTemplates sets template NodeInfos per-machine-type.
func (tcp *TestCloudProvider) SetMachineTemplates(machineTemplates map[string]*framework.NodeInfo) {
tcp.machineTemplates = machineTemplates
}

// Cleanup this is a function to close resources associated with the cloud provider
func (tcp *TestCloudProvider) Cleanup() error {
return nil
Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
Expand Down Expand Up @@ -63,6 +64,7 @@ type AutoscalerOptions struct {
ScaleUpOrchestrator scaleup.Orchestrator
DeleteOptions options.NodeDeleteOptions
DrainabilityRules rules.Rules
DraProvider *draprovider.Provider
}

// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
Expand Down Expand Up @@ -102,6 +104,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
opts.ScaleUpOrchestrator,
opts.DeleteOptions,
opts.DrainabilityRules,
opts.DraProvider,
), nil
}

Expand Down Expand Up @@ -165,6 +168,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.DrainabilityRules == nil {
opts.DrainabilityRules = rules.Default(opts.DeleteOptions)
}
if opts.DraProvider == nil && opts.DynamicResourceAllocationEnabled {
opts.DraProvider = draprovider.NewProviderFromInformers(informerFactory)
}

return nil
}
17 changes: 14 additions & 3 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand Down Expand Up @@ -68,6 +69,7 @@ type Actuator struct {
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
pastLatencies *expiring.List
draProvider *draprovider.Provider
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface {
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter, draProvider *draprovider.Provider) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
Expand All @@ -97,6 +99,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
pastLatencies: expiring.NewList(),
draProvider: draProvider,
}
}

Expand Down Expand Up @@ -357,7 +360,7 @@ func (a *Actuator) scaleDownNodeToReport(node *apiv1.Node, drain bool) (*status.
}

gpuConfig := a.ctx.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, gpuConfig, time.Now())
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, a.ctx.IgnoreMirrorPodsUtilization, a.ctx.DynamicResourceAllocationEnabled, gpuConfig, time.Now())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -394,7 +397,15 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
var draSnapshot drasnapshot.Snapshot
if a.ctx.DynamicResourceAllocationEnabled && a.draProvider != nil {
draSnapshot, err = a.draProvider.Snapshot()
if err != nil {
return nil, err
}
}

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/scaledown/eligibility/eligibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
}

gpuConfig := context.CloudProvider.GetNodeGpuConfig(node)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, gpuConfig, timestamp)
utilInfo, err := utilization.Calculate(nodeInfo, ignoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, context.DynamicResourceAllocationEnabled, gpuConfig, timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
return simulator.UnexpectedError, nil
}

// If scale down of unready nodes is disabled, skip the node if it is unready
Expand Down
99 changes: 81 additions & 18 deletions cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/google/go-cmp/cmp"

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
Expand All @@ -39,13 +42,15 @@ type testCase struct {
desc string
nodes []*apiv1.Node
pods []*apiv1.Pod
want []string
draSnapshot drasnapshot.Snapshot
draEnabled bool
wantUnneeded []string
wantUnremovable []*simulator.UnremovableNode
scaleDownUnready bool
ignoreDaemonSetsUtilization bool
}

func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time) []testCase {

regularNode := BuildTestNode("regular", 1000, 10)
SetNodeReadyState(regularNode, true, time.Time{})

Expand All @@ -69,51 +74,99 @@ func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time
dsPod := BuildTestPod("dsPod", 500, 0, WithDSController())
dsPod.Spec.NodeName = "regular"

brokenUtilNode := BuildTestNode("regular", 0, 0)
regularNodeIncompleteResourceSlice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{Name: "regularNodeIncompleteResourceSlice", UID: "regularNodeIncompleteResourceSlice"},
Spec: resourceapi.ResourceSliceSpec{
Driver: "driver.foo.com",
NodeName: "regular",
Pool: resourceapi.ResourcePool{
Name: "regular-pool",
ResourceSliceCount: 999,
},
Devices: []resourceapi.Device{{Name: "dev1"}},
},
}
testCases := []testCase{
{
desc: "regular node stays",
nodes: []*apiv1.Node{regularNode},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "recently deleted node is filtered out",
nodes: []*apiv1.Node{regularNode, justDeletedNode},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{{Node: justDeletedNode, Reason: simulator.CurrentlyBeingDeleted}},
scaleDownUnready: true,
},
{
desc: "marked no scale down is filtered out",
nodes: []*apiv1.Node{noScaleDownNode, regularNode},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{{Node: noScaleDownNode, Reason: simulator.ScaleDownDisabledAnnotation}},
scaleDownUnready: true,
},
{
desc: "highly utilized node is filtered out",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{bigPod},
want: []string{},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.NotUnderutilized}},
scaleDownUnready: true,
},
{
desc: "underutilized node stays",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "node is filtered out if utilization can't be calculated",
nodes: []*apiv1.Node{brokenUtilNode},
pods: []*apiv1.Pod{smallPod},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: brokenUtilNode, Reason: simulator.UnexpectedError}},
scaleDownUnready: true,
},
{
desc: "unready node stays",
nodes: []*apiv1.Node{unreadyNode},
want: []string{"unready"},
wantUnneeded: []string{"unready"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "unready node is filtered oud when scale-down of unready is disabled",
nodes: []*apiv1.Node{unreadyNode},
want: []string{},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: unreadyNode, Reason: simulator.ScaleDownUnreadyDisabled}},
scaleDownUnready: false,
},
{
desc: "Node is not filtered out because of DRA issues if DRA is disabled",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod},
draSnapshot: drasnapshot.NewSnapshot(nil, map[string][]*resourceapi.ResourceSlice{"regular": {regularNodeIncompleteResourceSlice}}, nil, nil),
draEnabled: false,
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
{
desc: "Node is filtered out because of DRA issues if DRA is enabled",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod},
draSnapshot: drasnapshot.NewSnapshot(nil, map[string][]*resourceapi.ResourceSlice{"regular": {regularNodeIncompleteResourceSlice}}, nil, nil),
draEnabled: true,
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.UnexpectedError}},
scaleDownUnready: true,
},
}

finalTestCases := []testCase{}
Expand All @@ -130,15 +183,17 @@ func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time
desc: "high utilization daemonsets node is filtered out",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod, dsPod},
want: []string{},
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.NotUnderutilized}},
scaleDownUnready: true,
ignoreDaemonSetsUtilization: false,
},
testCase{
desc: "high utilization daemonsets node stays",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod, dsPod},
want: []string{"regular"},
wantUnneeded: []string{"regular"},
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
ignoreDaemonSetsUtilization: true,
})
Expand All @@ -155,8 +210,9 @@ func TestFilterOutUnremovable(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
options := config.AutoscalingOptions{
UnremovableNodeRecheckTimeout: 5 * time.Minute,
ScaleDownUnreadyEnabled: tc.scaleDownUnready,
DynamicResourceAllocationEnabled: tc.draEnabled,
UnremovableNodeRecheckTimeout: 5 * time.Minute,
ScaleDownUnreadyEnabled: tc.scaleDownUnready,
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: config.DefaultScaleDownUtilizationThreshold,
ScaleDownGpuUtilizationThreshold: config.DefaultScaleDownGpuUtilizationThreshold,
Expand All @@ -173,13 +229,20 @@ func TestFilterOutUnremovable(t *testing.T) {
provider.AddNode("ng1", n)
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, nil, nil)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
if err != nil {
t.Fatalf("Could not create autoscaling context: %v", err)
}
if err := context.ClusterSnapshot.SetClusterState(tc.nodes, tc.pods, tc.draSnapshot); err != nil {
t.Fatalf("Could not SetClusterState: %v", err)
}
unremovableNodes := unremovable.NewNodes()
got, _, _ := c.FilterOutUnremovable(&context, tc.nodes, now, unremovableNodes)
assert.Equal(t, tc.want, got)
gotUnneeded, _, gotUnremovable := c.FilterOutUnremovable(&context, tc.nodes, now, unremovableNodes)
if diff := cmp.Diff(tc.wantUnneeded, gotUnneeded); diff != "" {
t.Errorf("FilterOutUnremovable(): unexpected unneeded (-want +got): %s", diff)
}
if diff := cmp.Diff(tc.wantUnremovable, gotUnremovable); diff != "" {
t.Errorf("FilterOutUnremovable(): unexpected unremovable (-want +got): %s", diff)
}
})
}
}
Loading

0 comments on commit 50c6590

Please sign in to comment.