Skip to content

Commit

Permalink
add support for volumeBinding plugin
Browse files Browse the repository at this point in the history
Signed-off-by: chxk <[email protected]>
  • Loading branch information
chxk authored and chenxukun committed Jul 13, 2023
1 parent 9427174 commit 35305bb
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *schedulingapi.TaskInfo, podVol
return dvb.volumeBinder.BindPodVolumes(context.TODO(), task.Pod, podVolumes)
}

// SchedulerVolumeBinder returns the original binder
func (dvb *defaultVolumeBinder) SchedulerVolumeBinder() volumescheduling.SchedulerVolumeBinder {
return dvb.volumeBinder
}

type podgroupBinder struct {
kubeclient *kubernetes.Clientset
vcclient *vcclient.Clientset
Expand Down Expand Up @@ -836,6 +841,11 @@ func (sc *SchedulerCache) RevertVolumes(task *schedulingapi.TaskInfo, podVolumes
sc.VolumeBinder.RevertVolumes(task, podVolumes)
}

// SchedulerVolumeBinder returns the original binder
func (sc *SchedulerCache) SchedulerVolumeBinder() volumescheduling.SchedulerVolumeBinder {
return sc.VolumeBinder.SchedulerVolumeBinder()
}

// Client returns the kubernetes clientSet
func (sc *SchedulerCache) Client() kubernetes.Interface {
return sc.kubeClient
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Cache interface {
// RevertVolumes clean cache generated by AllocateVolumes
RevertVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes)

// SchedulerVolumeBinder returns the original binder
SchedulerVolumeBinder() volumebinding.SchedulerVolumeBinder

// Client returns the kubernetes clientSet, which can be used by plugins
Client() kubernetes.Interface

Expand All @@ -95,6 +98,7 @@ type VolumeBinder interface {
RevertVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes)
AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *volumebinding.PodVolumes) error
BindVolumes(task *api.TaskInfo, podVolumes *volumebinding.PodVolumes) error
SchedulerVolumeBinder() volumebinding.SchedulerVolumeBinder
}

// Binder interface for binding task and hostname
Expand Down
33 changes: 33 additions & 0 deletions pkg/scheduler/capabilities/volumebinding/volume_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,36 @@ func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram
fts: fts,
}, nil
}

// NewWithBinder takes binder as parameter.
func NewWithBinder(plArgs runtime.Object, fh framework.Handle, fts feature.Features, binder SchedulerVolumeBinder) (framework.Plugin, error) {
args, ok := plArgs.(*config.VolumeBindingArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs)
}
if err := validation.ValidateVolumeBindingArgsWithOptions(nil, args, validation.VolumeBindingArgsValidationOptions{
AllowVolumeCapacityPriority: fts.EnableVolumeCapacityPriority,
}); err != nil {
return nil, err
}
pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims()

// build score function
var scorer volumeCapacityScorer
if fts.EnableVolumeCapacityPriority {
shape := make(helper.FunctionShape, 0, len(args.Shape))
for _, point := range args.Shape {
shape = append(shape, helper.FunctionShapePoint{
Utilization: int64(point.Utilization),
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
scorer = buildScorerFunction(shape)
}
return &VolumeBinding{
Binder: binder,
PVCLister: pvcInformer.Lister(),
scorer: scorer,
fts: fts,
}, nil
}
6 changes: 6 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
volumeScheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/util"
Expand Down Expand Up @@ -510,6 +511,11 @@ func (ssn *Session) UpdateSchedulerNumaInfo(AllocatedSets map[string]api.ResNuma
ssn.cache.UpdateSchedulerNumaInfo(AllocatedSets)
}

// GetSchedulerVolumeBinder returns the original volume binder
func (ssn *Session) GetSchedulerVolumeBinder() volumeScheduling.SchedulerVolumeBinder {
return ssn.cache.SchedulerVolumeBinder()
}

// KubeClient returns the kubernetes client
func (ssn Session) KubeClient() kubernetes.Interface {
return ssn.kubeClient
Expand Down
30 changes: 30 additions & 0 deletions pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilFeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -37,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"

"volcano.sh/volcano/pkg/scheduler/api"
volumeScheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/util/k8s"
)
Expand All @@ -63,6 +65,8 @@ const (
PodTopologySpreadWeight = "podtopologyspread.weight"
// SelectorSpreadWeight is the key for providing Selector Spread Priority Weight in YAML
selectorSpreadWeight = "selectorspread.weight"
// VolumeBindingWeight is the key for providing Volume Binding Priority Weight in YAML
volumeBindingWeight = "volumebinding.weight"
)

type nodeOrderPlugin struct {
Expand All @@ -89,6 +93,7 @@ type priorityWeight struct {
imageLocalityWeight int
podTopologySpreadWeight int
selectorSpreadWeight int
volumeBindingWeight int
}

// calculateWeight from the provided arguments.
Expand Down Expand Up @@ -118,6 +123,7 @@ type priorityWeight struct {
// tainttoleration.weight: 3
// imagelocality.weight: 1
// podtopologyspread.weight: 2
// volumebinding.weight: 2
func calculateWeight(args framework.Arguments) priorityWeight {
// Initial values for weights.
// By default, for backward compatibility and for reasonable scores,
Expand All @@ -132,6 +138,7 @@ func calculateWeight(args framework.Arguments) priorityWeight {
imageLocalityWeight: 1,
podTopologySpreadWeight: 2, // be consistent with kubernetes default setting.
selectorSpreadWeight: 0,
volumeBindingWeight: 0,
}

// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
Expand Down Expand Up @@ -161,6 +168,9 @@ func calculateWeight(args framework.Arguments) priorityWeight {
// Checks whether selectorspread.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.selectorSpreadWeight, selectorSpreadWeight)

// Checks whether volumebinding.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.volumeBindingWeight, volumeBindingWeight)

return weight
}

Expand Down Expand Up @@ -221,6 +231,15 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
p, _ = imagelocality.New(nil, handle)
imageLocality := p.(*imagelocality.ImageLocality)

// 6. VolumeBinding
volumeBindingArgs := &config.VolumeBindingArgs{
TypeMeta: metav1.TypeMeta{},
BindTimeoutSeconds: volumeScheduling.DefaultBindTimeoutSeconds,
Shape: nil,
}
p, _ = volumeScheduling.NewWithBinder(volumeBindingArgs, handle, fts, ssn.GetSchedulerVolumeBinder())
volumeBinding := p.(*volumeScheduling.VolumeBinding)

nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
var nodeScore = 0.0

Expand Down Expand Up @@ -290,6 +309,17 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("Node: %s, task<%s/%s> Node Affinity weight %d, score: %f", node.Name, task.Namespace, task.Name, weight.nodeAffinityWeight, float64(score)*float64(weight.nodeAffinityWeight))
}

// VolumeBinding
if weight.volumeBindingWeight != 0 {
score, status := volumeBinding.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Node: %s, Calculate Volume Binding Priority Failed because of Error: %v", node.Name, status.AsError())
return 0, status.AsError()
}
nodeScore += float64(score) * float64(weight.volumeBindingWeight)
klog.V(5).Infof("Node: %s, task<%s/%s> Volume Binding weight %d, score: %f", node.Name, task.Namespace, task.Name, weight.volumeBindingWeight, float64(score)*float64(weight.volumeBindingWeight))
}

klog.V(4).Infof("Nodeorder Total Score for task<%s/%s> on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore)
return nodeScore, nil
}
Expand Down
27 changes: 26 additions & 1 deletion pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilFeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
Expand All @@ -40,6 +41,7 @@ import (
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare"
"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/vgpu"
volumeScheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/util/k8s"
)
Expand Down Expand Up @@ -113,6 +115,7 @@ type predicateEnable struct {
podAffinityEnable bool
nodeVolumeLimitsEnable bool
volumeZoneEnable bool
volumeBindingEnable bool
podTopologySpreadEnable bool
cacheEnable bool
proportionalEnable bool
Expand Down Expand Up @@ -140,6 +143,7 @@ func enablePredicate(args framework.Arguments) predicateEnable {
predicate.PodAffinityEnable: true
predicate.NodeVolumeLimitsEnable: true
predicate.VolumeZoneEnable: true
predicate.VolumeBindingEnable: true
predicate.PodTopologySpreadEnable: true
predicate.GPUSharingEnable: true
predicate.GPUNumberEnable: true
Expand All @@ -159,6 +163,7 @@ func enablePredicate(args framework.Arguments) predicateEnable {
podAffinityEnable: true,
nodeVolumeLimitsEnable: true,
volumeZoneEnable: true,
volumeBindingEnable: false,
podTopologySpreadEnable: true,
cacheEnable: false,
proportionalEnable: false,
Expand Down Expand Up @@ -343,7 +348,15 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
// 7. VolumeZone
plugin, _ = volumezone.New(nil, handle)
volumeZoneFilter := plugin.(*volumezone.VolumeZone)
// 8. PodTopologySpread
// 8. VolumeBinding
volumeBindingArgs := &config.VolumeBindingArgs{
TypeMeta: metav1.TypeMeta{},
BindTimeoutSeconds: volumeScheduling.DefaultBindTimeoutSeconds,
Shape: nil,
}
plugin, _ = volumeScheduling.NewWithBinder(volumeBindingArgs, handle, features, ssn.GetSchedulerVolumeBinder())
volumeBindingFilter := plugin.(*volumeScheduling.VolumeBinding)
// 9. PodTopologySpread
// Setting cluster level default constraints is not support for now.
ptsArgs := &config.PodTopologySpreadArgs{DefaultingType: config.SystemDefaulting}
plugin, _ = podtopologyspread.New(ptsArgs, handle, features)
Expand Down Expand Up @@ -484,6 +497,18 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
}
}

// Check VolumeBinding
if predicate.volumeBindingEnable {
_, status := volumeBindingFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", volumeBindingFilter.Name(), status.Message())
}
status = volumeBindingFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", volumeBindingFilter.Name(), status.Message())
}
}

// Check PodTopologySpread
if predicate.podTopologySpreadEnable {
status := podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/util/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,7 @@ func (fvb *FakeVolumeBinder) RevertVolumes(task *api.TaskInfo, podVolumes *volum
fvb.volumeBinder.RevertAssumedPodVolumes(podVolumes)
}
}

func (fvb *FakeVolumeBinder) SchedulerVolumeBinder() volumescheduling.SchedulerVolumeBinder {
return fvb.volumeBinder
}

0 comments on commit 35305bb

Please sign in to comment.