From e1647b3b5818b2e4b08c070913e1c3e84d391896 Mon Sep 17 00:00:00 2001 From: limengxuan <391013634@qq.com> Date: Wed, 15 May 2024 16:33:05 +0800 Subject: [PATCH 1/2] update Score for deviceshare plugin Signed-off-by: limengxuan <391013634@qq.com> --- docs/user-guide/how_to_use_gpu_sharing.md | 2 + .../devices/nvidia/gpushare/device_info.go | 6 +- .../api/devices/nvidia/vgpu/device_info.go | 27 ++++-- .../api/devices/nvidia/vgpu/metrics.go | 10 +- pkg/scheduler/api/devices/nvidia/vgpu/type.go | 10 ++ .../api/devices/nvidia/vgpu/utils.go | 94 +++++++++++-------- pkg/scheduler/api/shared_device_pool.go | 6 +- .../plugins/deviceshare/deviceshare.go | 64 ++++++++++++- 8 files changed, 163 insertions(+), 56 deletions(-) diff --git a/docs/user-guide/how_to_use_gpu_sharing.md b/docs/user-guide/how_to_use_gpu_sharing.md index 58d8042dc2..d2b2f32e41 100644 --- a/docs/user-guide/how_to_use_gpu_sharing.md +++ b/docs/user-guide/how_to_use_gpu_sharing.md @@ -8,6 +8,8 @@ Refer to [Install Guide](../../installer/README.md) to install volcano. +> **Note** The Volcano VGPU feature has been transferred to the HAMI project, click [here](https://github.com/Project-HAMi/volcano-vgpu-device-plugin) to access + After installed, update the scheduler configuration: ```shell script diff --git a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go index 3604698364..8291c09def 100644 --- a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go @@ -148,7 +148,7 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { +func (gs *GPUDevices) FilterNode(pod *v1.Pod, schedulePolicy string) (int, string, error) { klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name) if GpuSharingEnable { fit, err := checkNodeGPUSharingPredicate(pod, gs) @@ -172,6 +172,10 @@ func (gs *GPUDevices) GetStatus() string { return "" } +func (gs *GPUDevices) ScoreNode(pod *v1.Pod, schedulePolicy string) float64 { + return 0 +} + func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { klog.V(4).Infoln("DeviceSharing:Into AllocateToPod", pod.Name) if getGPUMemoryOfPod(pod) > 0 { diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go index eb6102081c..adf3af80d9 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go @@ -58,6 +58,9 @@ type GPUDevice struct { type GPUDevices struct { Name string + // We cache score in filter step according to schedulePolicy, to avoid recalculating in score + ScoreMap map[string]float64 + Device map[int]*GPUDevice } @@ -90,7 +93,7 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices { return nil } for _, val := range nodedevices.Device { - klog.V(3).Infoln("name=", nodedevices.Name, "val=", *val) + klog.V(4).Infoln("name=", nodedevices.Name, "val=", *val) } // We have to handshake here in order to avoid time-inconsistency between scheduler and nodes @@ -114,6 +117,15 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices { return nodedevices } +func (gs *GPUDevices) ScoreNode(pod *v1.Pod, schedulePolicy string) float64 { + /* TODO: we need a base score to be campatable with preemption, it means a node without evicting a task has + a higher score than those needs to evict a task */ + + // Use cached stored in filter state in order to avoid recalculating. + klog.V(3).Infof("Scoring pod %s with to node %s with score %f", gs.Name, pod.Name, gs.ScoreMap[pod.Name]) + return gs.ScoreMap[pod.Name] +} + func (gs *GPUDevices) GetIgnoredDevices() []string { return []string{VolcanoVGPUMemory, VolcanoVGPUMemoryPercentage, VolcanoVGPUCores} } @@ -179,23 +191,24 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { +func (gs *GPUDevices) FilterNode(pod *v1.Pod, schedulePolicy string) (int, string, error) { if VGPUEnable { - klog.V(5).Infoln("4pdvgpu DeviceSharing starts filtering pods", pod.Name) - fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true) + klog.V(4).Infoln("hami-vgpu DeviceSharing starts filtering pods", pod.Name) + fit, _, score, err := checkNodeGPUSharingPredicateAndScore(pod, gs, true, schedulePolicy) if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err } - klog.V(5).Infoln("4pdvgpu DeviceSharing successfully filters pods") + gs.ScoreMap[pod.Name] = score + klog.V(4).Infoln("hami-vgpu DeviceSharing successfully filters pods") } return devices.Success, "", nil } func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { if VGPUEnable { - klog.V(3).Infoln("VGPU DeviceSharing:Into AllocateToPod", pod.Name) - fit, device, err := checkNodeGPUSharingPredicate(pod, gs, false) + klog.V(4).Infoln("hami-vgpu DeviceSharing:Into AllocateToPod", pod.Name) + fit, device, _, err := checkNodeGPUSharingPredicateAndScore(pod, gs, false, "") if err != nil || !fit { klog.Errorln("DeviceSharing err=", err.Error()) return err diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/metrics.go b/pkg/scheduler/api/devices/nvidia/vgpu/metrics.go index 6698171fdf..eda20a8fbc 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/metrics.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/metrics.go @@ -69,10 +69,12 @@ var ( func (gs *GPUDevices) GetStatus() string { for _, val := range gs.Device { - VGPUDevicesSharedNumber.WithLabelValues(val.UUID).Set(float64(val.UsedNum)) - VGPUDevicesSharedMemory.WithLabelValues(val.UUID).Set(float64(val.UsedMem)) - VGPUDevicesMemoryLimit.WithLabelValues(val.UUID).Set(float64(val.Memory)) - VGPUDevicesSharedCores.WithLabelValues(val.UUID).Set(float64(val.UsedCore)) + if val != nil { + VGPUDevicesSharedNumber.WithLabelValues(val.UUID).Set(float64(val.UsedNum)) + VGPUDevicesSharedMemory.WithLabelValues(val.UUID).Set(float64(val.UsedMem)) + VGPUDevicesMemoryLimit.WithLabelValues(val.UUID).Set(float64(val.Memory)) + VGPUDevicesSharedCores.WithLabelValues(val.UUID).Set(float64(val.UsedCore)) + } } return "" } diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/type.go b/pkg/scheduler/api/devices/nvidia/vgpu/type.go index 16544c5473..bdd7aec41f 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/type.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/type.go @@ -54,6 +54,16 @@ const ( // DeviceName used to indicate this device DeviceName = "vgpu4pd" + + // binpack means the lower device memory remained after this allocation, the better + binpackPolicy = "binpack" + // spread means better put this task into an idle GPU card than a shared GPU card + spreadPolicy = "spread" + // 101 means wo don't assign defaultMemPercentage value + + DefaultMemPercentage = 101 + binpackMultiplier = 100 + spreadMultiplier = 100 ) type ContainerDeviceRequest struct { diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/utils.go b/pkg/scheduler/api/devices/nvidia/vgpu/utils.go index 17be77deac..7da1fcd8a8 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/utils.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/utils.go @@ -94,24 +94,27 @@ func decodeNodeDevices(name string, str string) *GPUDevices { } tmp := strings.Split(str, ":") retval := &GPUDevices{ - Name: name, - Device: make(map[int]*GPUDevice), + Name: name, + Device: make(map[int]*GPUDevice), + ScoreMap: make(map[string]float64), } for index, val := range tmp { - if strings.Contains(val, ",") { - items := strings.Split(val, ",") - count, _ := strconv.Atoi(items[1]) - devmem, _ := strconv.Atoi(items[2]) - health, _ := strconv.ParseBool(items[4]) - i := GPUDevice{ - ID: index, - UUID: items[0], - Number: uint(count), - Memory: uint(devmem), - Type: items[3], - Health: health, + if len(val) > 0 { + if strings.Contains(val, ",") { + items := strings.Split(val, ",") + count, _ := strconv.Atoi(items[1]) + devmem, _ := strconv.Atoi(items[2]) + health, _ := strconv.ParseBool(items[4]) + i := GPUDevice{ + ID: index, + UUID: items[0], + Number: uint(count), + Memory: uint(devmem), + Type: items[3], + Health: health, + } + retval.Device[index] = &i } - retval.Device[index] = &i } } return retval @@ -305,35 +308,40 @@ func checkType(annos map[string]string, d GPUDevice, n ContainerDeviceRequest) b func getGPUDeviceSnapShot(snap *GPUDevices) *GPUDevices { ret := GPUDevices{ - Name: snap.Name, - Device: make(map[int]*GPUDevice), + Name: snap.Name, + Device: make(map[int]*GPUDevice), + ScoreMap: make(map[string]float64), } for index, val := range snap.Device { - ret.Device[index] = &GPUDevice{ - ID: val.ID, - UUID: val.UUID, - PodMap: val.PodMap, - Memory: val.Memory, - Number: val.Number, - Type: val.Type, - Health: val.Health, - UsedNum: val.UsedNum, - UsedMem: val.UsedMem, - UsedCore: val.UsedCore, + if val != nil { + ret.Device[index] = &GPUDevice{ + ID: val.ID, + UUID: val.UUID, + PodMap: val.PodMap, + Memory: val.Memory, + Number: val.Number, + Type: val.Type, + Health: val.Health, + UsedNum: val.UsedNum, + UsedMem: val.UsedMem, + UsedCore: val.UsedCore, + } } } return &ret } // checkNodeGPUSharingPredicate checks if a pod with gpu requirement can be scheduled on a node. -func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate bool) (bool, []ContainerDevices, error) { +func checkNodeGPUSharingPredicateAndScore(pod *v1.Pod, gssnap *GPUDevices, replicate bool, schedulePolicy string) (bool, []ContainerDevices, float64, error) { + // no gpu sharing request + score := float64(0) if !checkVGPUResourcesInPod(pod) { - return true, []ContainerDevices{}, nil + return true, []ContainerDevices{}, 0, nil } ctrReq := resourcereqs(pod) if len(ctrReq) == 0 { - return true, []ContainerDevices{}, nil + return true, []ContainerDevices{}, 0, nil } var gs *GPUDevices if replicate { @@ -345,13 +353,13 @@ func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate boo for _, val := range ctrReq { devs := []ContainerDevice{} if int(val.Nums) > len(gs.Device) { - return false, []ContainerDevices{}, fmt.Errorf("no enough gpu cards on node %s", gs.Name) + return false, []ContainerDevices{}, 0, fmt.Errorf("no enough gpu cards on node %s", gs.Name) } - klog.V(3).Infoln("Allocating device for container request", val) + klog.V(3).InfoS("Allocating device for container", "request", val) for i := len(gs.Device) - 1; i >= 0; i-- { - klog.V(3).Info("Scoring pod ", val.Memreq, ":", val.MemPercentagereq, ":", val.Coresreq, ":", val.Nums, "i", i, "device:", gs.Device[i].ID) - klog.V(3).Infoln("gs", i, "=", gs.Device[i].Memory, gs.Device[i].UsedMem, gs.Device[i].UsedNum) + klog.V(3).InfoS("Scoring pod request", "memReq", val.Memreq, "memPercentageReq", val.MemPercentagereq, "coresReq", val.Coresreq, "Nums", val.Nums, "Index", i, "ID", gs.Device[i].ID) + klog.V(3).InfoS("Current Device", "Index", i, "TotalMemory", gs.Device[i].Memory, "UsedMemory", gs.Device[i].UsedMem, "UsedCores", gs.Device[i].UsedNum) if gs.Device[i].Number <= uint(gs.Device[i].UsedNum) { continue } @@ -379,7 +387,7 @@ func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate boo //total += gs.Devices[i].Count //free += node.Devices[i].Count - node.Devices[i].Used if val.Nums > 0 { - klog.V(3).Infoln("device", gs.Device[i].ID, "fitted") + klog.V(3).InfoS("device fitted", "ID", gs.Device[i].ID) val.Nums-- gs.Device[i].UsedNum++ gs.Device[i].UsedMem += uint(val.Memreq) @@ -390,17 +398,27 @@ func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate boo Usedmem: val.Memreq, Usedcores: val.Coresreq, }) + switch { + case schedulePolicy == binpackPolicy: + score += binpackMultiplier * (float64(gs.Device[i].UsedMem) / float64(gs.Device[i].Memory)) + case schedulePolicy == spreadPolicy: + if gs.Device[i].UsedNum == 1 { + score += spreadMultiplier + } + default: + score = float64(0) + } } if val.Nums == 0 { break } } if val.Nums > 0 { - return false, []ContainerDevices{}, fmt.Errorf("not enough gpu fitted on this node") + return false, []ContainerDevices{}, 0, fmt.Errorf("not enough gpu fitted on this node") } ctrdevs = append(ctrdevs, devs) } - return true, ctrdevs, nil + return true, ctrdevs, score, nil } func patchPodAnnotations(pod *v1.Pod, annotations map[string]string) error { diff --git a/pkg/scheduler/api/shared_device_pool.go b/pkg/scheduler/api/shared_device_pool.go index 6c94a97f54..c6d162ef29 100644 --- a/pkg/scheduler/api/shared_device_pool.go +++ b/pkg/scheduler/api/shared_device_pool.go @@ -59,7 +59,11 @@ type Devices interface { // preemption would not change anything. Plugins should return Unschedulable if it is possible // that the pod can get scheduled with preemption. // The accompanying status message should explain why the pod is unschedulable. - FilterNode(pod *v1.Pod) (int, string, error) + FilterNode(pod *v1.Pod, policy string) (int, string, error) + // ScoreNode will be invoked when using devicescore plugin, devices api can use it to implement multiple + // scheduling policies. + ScoreNode(pod *v1.Pod, policy string) float64 + // Allocate action in predicate Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error // Release action in predicate diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare.go b/pkg/scheduler/plugins/deviceshare/deviceshare.go index 0b35952323..6abfbd6b18 100644 --- a/pkg/scheduler/plugins/deviceshare/deviceshare.go +++ b/pkg/scheduler/plugins/deviceshare/deviceshare.go @@ -17,10 +17,15 @@ limitations under the License. package deviceshare import ( + "context" "fmt" + "math" + "reflect" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" + k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/devices" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" @@ -28,6 +33,11 @@ import ( "volcano.sh/volcano/pkg/scheduler/framework" ) +var ( + SchedulePolicy string = "" + scheduleWeight int = 0 +) + // PluginName indicates name of volcano scheduler plugin. const ( PluginName = "deviceshare" @@ -37,6 +47,9 @@ const ( GPUNumberPredicate = "deviceshare.GPUNumberEnable" VGPUEnable = "deviceshare.VGPUEnable" + + SchedulePolicyArgument = "deviceshare.SchedulePolicy" + ScheduleWeight = "deviceshare.ScheduleWeight" ) type deviceSharePlugin struct { @@ -60,6 +73,12 @@ func enablePredicate(args framework.Arguments) { args.GetBool(&gpushare.NodeLockEnable, NodeLockEnable) args.GetBool(&vgpu.VGPUEnable, VGPUEnable) + _, ok := args[SchedulePolicyArgument] + if ok { + SchedulePolicy = args[SchedulePolicyArgument].(string) + } + args.GetInt(&scheduleWeight, ScheduleWeight) + if gpushare.GpuSharingEnable && gpushare.GpuNumberEnable { klog.Fatal("can not define true in both gpu sharing and gpu number") } @@ -76,6 +95,18 @@ func createStatus(code int, reason string) *api.Status { return &status } +func getDeviceScore(ctx context.Context, pod *v1.Pod, node *api.NodeInfo, schedulePolicy string) (int64, *k8sframework.Status) { + s := float64(0) + for _, devices := range node.Others { + if devices.(api.Devices).HasDeviceRequest(pod) { + ns := devices.(api.Devices).ScoreNode(pod, schedulePolicy) + s += ns + } + } + klog.V(4).Infof("deviceScore for task %s/%s is: %v", pod.Namespace, pod.Name, s) + return int64(math.Floor(s + 0.5)), nil +} + func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { enablePredicate(dp.pluginArguments) // Register event handlers to update task info in PodLister & nodeMap @@ -84,12 +115,19 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { // Check PredicateWithCache for _, val := range api.RegisteredDevices { if dev, ok := node.Others[val].(api.Devices); ok { - if dev == nil { - predicateStatus = append(predicateStatus, - createStatus(devices.Unschedulable, "node not initialized with device"+val)) - return predicateStatus, fmt.Errorf("node not initialized with device %s", val) + if reflect.ValueOf(dev).IsNil() { + // TODO When a pod requests a device of the current type, but the current node does not have such a device, an error is thrown + if dev == nil || dev.HasDeviceRequest(task.Pod) { + predicateStatus = append(predicateStatus, &api.Status{ + Code: devices.Unschedulable, + Reason: "node not initialized with device" + val, + }) + return predicateStatus, fmt.Errorf("node not initialized with device %s", val) + } + klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name) + continue } - code, msg, err := dev.FilterNode(task.Pod) + code, msg, err := dev.FilterNode(task.Pod, SchedulePolicy) if err != nil { predicateStatus = append(predicateStatus, createStatus(code, msg)) return predicateStatus, err @@ -109,6 +147,22 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { return predicateStatus, nil }) + + ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + // DeviceScore + if len(SchedulePolicy) > 0 { + score, status := getDeviceScore(context.TODO(), task.Pod, node, SchedulePolicy) + if !status.IsSuccess() { + klog.Warningf("Node: %s, Calculate Device Score Failed because of Error: %v", node.Name, status.AsError()) + return 0, status.AsError() + } + + // TODO: we should use a seperate plugin for devices, and seperate them from predicates and nodeOrder plugin. + nodeScore := float64(score) * float64(scheduleWeight) + klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f", node.Name, task.Namespace, task.Name, scheduleWeight, nodeScore) + } + return 0, nil + }) } func (dp *deviceSharePlugin) OnSessionClose(ssn *framework.Session) {} From 0237537c78ad3a9cd464751df43a066f30f16f67 Mon Sep 17 00:00:00 2001 From: yangqz Date: Thu, 16 May 2024 13:08:07 +0800 Subject: [PATCH 2/2] Signed-off-by: yangqz 1.Fix device share plugins npe 2.Fix vgpu device handshake patch error 3.update and add deviceshare ut --- docs/user-guide/how_to_use_gpu_number.md | 2 + .../api/devices/nvidia/vgpu/device_info.go | 11 +- pkg/scheduler/api/devices/nvidia/vgpu/type.go | 2 +- .../api/devices/nvidia/vgpu/utils.go | 49 ++++--- .../plugins/deviceshare/deviceshare.go | 31 ++--- .../plugins/deviceshare/deviceshare_test.go | 130 ++++++++++++++++++ 6 files changed, 176 insertions(+), 49 deletions(-) create mode 100644 pkg/scheduler/plugins/deviceshare/deviceshare_test.go diff --git a/docs/user-guide/how_to_use_gpu_number.md b/docs/user-guide/how_to_use_gpu_number.md index abe79860ba..10da190655 100644 --- a/docs/user-guide/how_to_use_gpu_number.md +++ b/docs/user-guide/how_to_use_gpu_number.md @@ -8,6 +8,8 @@ Refer to [Install Guide](../../installer/README.md) to install volcano. +> **Note** The Volcano VGPU feature has been transferred to the HAMI project, click [here](https://github.com/Project-HAMi/volcano-vgpu-device-plugin) to access + After installed, update the scheduler configuration: ```shell script diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go index adf3af80d9..9c5c2d68ca 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go @@ -59,7 +59,7 @@ type GPUDevices struct { Name string // We cache score in filter step according to schedulePolicy, to avoid recalculating in score - ScoreMap map[string]float64 + Score float64 Device map[int]*GPUDevice } @@ -103,7 +103,7 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices { klog.Infof("node %v device %s leave", node.Name, handshake) tmppat := make(map[string]string) - tmppat[handshake] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05") + tmppat[VolcanoVGPUHandshake] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05") patchNodeAnnotations(node, tmppat) return nil } @@ -122,8 +122,7 @@ func (gs *GPUDevices) ScoreNode(pod *v1.Pod, schedulePolicy string) float64 { a higher score than those needs to evict a task */ // Use cached stored in filter state in order to avoid recalculating. - klog.V(3).Infof("Scoring pod %s with to node %s with score %f", gs.Name, pod.Name, gs.ScoreMap[pod.Name]) - return gs.ScoreMap[pod.Name] + return gs.Score } func (gs *GPUDevices) GetIgnoredDevices() []string { @@ -197,9 +196,9 @@ func (gs *GPUDevices) FilterNode(pod *v1.Pod, schedulePolicy string) (int, strin fit, _, score, err := checkNodeGPUSharingPredicateAndScore(pod, gs, true, schedulePolicy) if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err + return devices.Unschedulable, fmt.Sprintf("hami-vgpuDeviceSharing %s", err.Error()), err } - gs.ScoreMap[pod.Name] = score + gs.Score = score klog.V(4).Infoln("hami-vgpu DeviceSharing successfully filters pods") } return devices.Success, "", nil diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/type.go b/pkg/scheduler/api/devices/nvidia/vgpu/type.go index bdd7aec41f..020ee2196c 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/type.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/type.go @@ -53,7 +53,7 @@ const ( UnhealthyGPUIDs = "volcano.sh/gpu-unhealthy-ids" // DeviceName used to indicate this device - DeviceName = "vgpu4pd" + DeviceName = "hamivgpu" // binpack means the lower device memory remained after this allocation, the better binpackPolicy = "binpack" diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/utils.go b/pkg/scheduler/api/devices/nvidia/vgpu/utils.go index 7da1fcd8a8..89170c431c 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/utils.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/utils.go @@ -40,7 +40,7 @@ func init() { var err error kubeClient, err = NewClient() if err != nil { - klog.Errorf("init kubeclient in 4pdvgpu failed: %s", err.Error()) + klog.Errorf("init kubeclient in hamivgpu failed: %s", err.Error()) } else { klog.V(3).Infoln("init kubeclient success") } @@ -94,27 +94,25 @@ func decodeNodeDevices(name string, str string) *GPUDevices { } tmp := strings.Split(str, ":") retval := &GPUDevices{ - Name: name, - Device: make(map[int]*GPUDevice), - ScoreMap: make(map[string]float64), + Name: name, + Device: make(map[int]*GPUDevice), + Score: float64(0), } for index, val := range tmp { - if len(val) > 0 { - if strings.Contains(val, ",") { - items := strings.Split(val, ",") - count, _ := strconv.Atoi(items[1]) - devmem, _ := strconv.Atoi(items[2]) - health, _ := strconv.ParseBool(items[4]) - i := GPUDevice{ - ID: index, - UUID: items[0], - Number: uint(count), - Memory: uint(devmem), - Type: items[3], - Health: health, - } - retval.Device[index] = &i + if strings.Contains(val, ",") { + items := strings.Split(val, ",") + count, _ := strconv.Atoi(items[1]) + devmem, _ := strconv.Atoi(items[2]) + health, _ := strconv.ParseBool(items[4]) + i := GPUDevice{ + ID: index, + UUID: items[0], + Number: uint(count), + Memory: uint(devmem), + Type: items[3], + Health: health, } + retval.Device[index] = &i } } return retval @@ -308,9 +306,9 @@ func checkType(annos map[string]string, d GPUDevice, n ContainerDeviceRequest) b func getGPUDeviceSnapShot(snap *GPUDevices) *GPUDevices { ret := GPUDevices{ - Name: snap.Name, - Device: make(map[int]*GPUDevice), - ScoreMap: make(map[string]float64), + Name: snap.Name, + Device: make(map[int]*GPUDevice), + Score: float64(0), } for index, val := range snap.Device { if val != nil { @@ -333,7 +331,6 @@ func getGPUDeviceSnapShot(snap *GPUDevices) *GPUDevices { // checkNodeGPUSharingPredicate checks if a pod with gpu requirement can be scheduled on a node. func checkNodeGPUSharingPredicateAndScore(pod *v1.Pod, gssnap *GPUDevices, replicate bool, schedulePolicy string) (bool, []ContainerDevices, float64, error) { - // no gpu sharing request score := float64(0) if !checkVGPUResourcesInPod(pod) { @@ -398,10 +395,10 @@ func checkNodeGPUSharingPredicateAndScore(pod *v1.Pod, gssnap *GPUDevices, repli Usedmem: val.Memreq, Usedcores: val.Coresreq, }) - switch { - case schedulePolicy == binpackPolicy: + switch schedulePolicy { + case binpackPolicy: score += binpackMultiplier * (float64(gs.Device[i].UsedMem) / float64(gs.Device[i].Memory)) - case schedulePolicy == spreadPolicy: + case spreadPolicy: if gs.Device[i].UsedNum == 1 { score += spreadMultiplier } diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare.go b/pkg/scheduler/plugins/deviceshare/deviceshare.go index 6abfbd6b18..62b083cba7 100644 --- a/pkg/scheduler/plugins/deviceshare/deviceshare.go +++ b/pkg/scheduler/plugins/deviceshare/deviceshare.go @@ -24,8 +24,8 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" - k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/devices" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" @@ -33,11 +33,6 @@ import ( "volcano.sh/volcano/pkg/scheduler/framework" ) -var ( - SchedulePolicy string = "" - scheduleWeight int = 0 -) - // PluginName indicates name of volcano scheduler plugin. const ( PluginName = "deviceshare" @@ -55,19 +50,24 @@ const ( type deviceSharePlugin struct { // Arguments given for the plugin pluginArguments framework.Arguments + schedulePolicy string + scheduleWeight int } // New return priority plugin func New(arguments framework.Arguments) framework.Plugin { - return &deviceSharePlugin{pluginArguments: arguments} + dsp := &deviceSharePlugin{pluginArguments: arguments, schedulePolicy: "", scheduleWeight: 0} + enablePredicate(dsp) + return dsp } func (dp *deviceSharePlugin) Name() string { return PluginName } -func enablePredicate(args framework.Arguments) { +func enablePredicate(dsp *deviceSharePlugin) { // Checks whether predicate.GPUSharingEnable is provided or not, if given, modifies the value in predicateEnable struct. + args := dsp.pluginArguments args.GetBool(&gpushare.GpuSharingEnable, GPUSharingPredicate) args.GetBool(&gpushare.GpuNumberEnable, GPUNumberPredicate) args.GetBool(&gpushare.NodeLockEnable, NodeLockEnable) @@ -75,9 +75,9 @@ func enablePredicate(args framework.Arguments) { _, ok := args[SchedulePolicyArgument] if ok { - SchedulePolicy = args[SchedulePolicyArgument].(string) + dsp.schedulePolicy = args[SchedulePolicyArgument].(string) } - args.GetInt(&scheduleWeight, ScheduleWeight) + args.GetInt(&dsp.scheduleWeight, ScheduleWeight) if gpushare.GpuSharingEnable && gpushare.GpuNumberEnable { klog.Fatal("can not define true in both gpu sharing and gpu number") @@ -108,7 +108,6 @@ func getDeviceScore(ctx context.Context, pod *v1.Pod, node *api.NodeInfo, schedu } func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { - enablePredicate(dp.pluginArguments) // Register event handlers to update task info in PodLister & nodeMap ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { predicateStatus := make([]*api.Status, 0) @@ -127,7 +126,7 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name) continue } - code, msg, err := dev.FilterNode(task.Pod, SchedulePolicy) + code, msg, err := dev.FilterNode(task.Pod, dp.schedulePolicy) if err != nil { predicateStatus = append(predicateStatus, createStatus(code, msg)) return predicateStatus, err @@ -150,16 +149,16 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { // DeviceScore - if len(SchedulePolicy) > 0 { - score, status := getDeviceScore(context.TODO(), task.Pod, node, SchedulePolicy) + if len(dp.schedulePolicy) > 0 { + score, status := getDeviceScore(context.TODO(), task.Pod, node, dp.schedulePolicy) if !status.IsSuccess() { klog.Warningf("Node: %s, Calculate Device Score Failed because of Error: %v", node.Name, status.AsError()) return 0, status.AsError() } // TODO: we should use a seperate plugin for devices, and seperate them from predicates and nodeOrder plugin. - nodeScore := float64(score) * float64(scheduleWeight) - klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f", node.Name, task.Namespace, task.Name, scheduleWeight, nodeScore) + nodeScore := float64(score) * float64(dp.scheduleWeight) + klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f", node.Name, task.Namespace, task.Name, dp.scheduleWeight, nodeScore) } return 0, nil }) diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare_test.go b/pkg/scheduler/plugins/deviceshare/deviceshare_test.go new file mode 100644 index 0000000000..e3a2de1f7b --- /dev/null +++ b/pkg/scheduler/plugins/deviceshare/deviceshare_test.go @@ -0,0 +1,130 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceshare + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/vgpu" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/util" +) + +func TestArguments(t *testing.T) { + framework.RegisterPluginBuilder(PluginName, New) + defer framework.CleanupPluginBuilders() + + arguments := framework.Arguments{ + "deviceshare.VGPUEnable": true, + "deviceshare.SchedulePolicy": "binpack", + "deviceshare.ScheduleWeight": 10, + } + + builder, ok := framework.GetPluginBuilder(PluginName) + + if !ok { + t.Fatalf("should have plugin named %s", PluginName) + } + + plugin := builder(arguments) + deviceshare, ok := plugin.(*deviceSharePlugin) + + if !ok { + t.Fatalf("plugin should be %T, but not %T", deviceshare, plugin) + } + + weight := deviceshare.scheduleWeight + + if weight != 10 { + t.Errorf("weight should be 10, but not %v", weight) + } + + if deviceshare.schedulePolicy != "binpack" { + t.Errorf("policy should be binpack, but not %s", deviceshare.schedulePolicy) + } +} + +func addResource(resourceList v1.ResourceList, name v1.ResourceName, need string) { + resourceList[name] = resource.MustParse(need) +} + +func TestVgpuScore(t *testing.T) { + gpuNode1 := vgpu.GPUDevices{ + Name: "node1", + Score: float64(0), + Device: make(map[int]*vgpu.GPUDevice), + } + gpuNode1.Device[0] = vgpu.NewGPUDevice(0, 30000) + gpuNode1.Device[0].Type = "NVIDIA" + gpuNode1.Device[0].Number = 10 + gpuNode1.Device[0].UsedNum = 1 + gpuNode1.Device[0].UsedMem = 3000 + + gpunumber := v1.ResourceName("volcano.sh/vgpu-number") + gpumemory := v1.ResourceName("volcano.sh/vgpu-memory") + + vgpu.VGPUEnable = true + + p1 := util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "10Gi"), "pg1", make(map[string]string), make(map[string]string)) + addResource(p1.Spec.Containers[0].Resources.Requests, gpunumber, "1") + addResource(p1.Spec.Containers[0].Resources.Requests, gpumemory, "1000") + p1.Spec.Containers[0].Resources.Limits = make(v1.ResourceList) + addResource(p1.Spec.Containers[0].Resources.Limits, gpunumber, "1") + addResource(p1.Spec.Containers[0].Resources.Limits, gpumemory, "1000") + + canAccess, _, err := gpuNode1.FilterNode(p1, "binpack") + if err != nil || canAccess != 0 { + t.Errorf("binpack filter failed %s", err.Error()) + } + + score := gpuNode1.ScoreNode(p1, "binpack") + if score-float64(4000*100)/float64(30000) > 0.05 { + t.Errorf("score failed expected %f, get %f", float64(4000*100)/float64(30000), score) + } + + gpuNode2 := vgpu.GPUDevices{ + Name: "node2", + Score: float64(0), + Device: make(map[int]*vgpu.GPUDevice), + } + gpuNode2.Device[0] = vgpu.NewGPUDevice(0, 30000) + gpuNode2.Device[0].Type = "NVIDIA" + gpuNode2.Device[0].Number = 10 + gpuNode2.Device[0].UsedNum = 0 + gpuNode2.Device[0].UsedMem = 0 + p2 := util.BuildPod("c2", "p4", "", v1.PodPending, api.BuildResourceList("2", "10Gi"), "pg1", make(map[string]string), make(map[string]string)) + addResource(p2.Spec.Containers[0].Resources.Requests, gpunumber, "1") + addResource(p2.Spec.Containers[0].Resources.Requests, gpumemory, "1000") + p2.Spec.Containers[0].Resources.Limits = make(v1.ResourceList) + addResource(p2.Spec.Containers[0].Resources.Limits, gpunumber, "1") + addResource(p2.Spec.Containers[0].Resources.Limits, gpumemory, "1000") + + canAccess, _, err = gpuNode2.FilterNode(p2, "spread") + if err != nil || canAccess != 0 { + t.Errorf("binpack filter failed %s", err.Error()) + } + + score = gpuNode2.ScoreNode(p1, "spread") + if score-float64(100) > 0.05 { + t.Errorf("score failed expected %f, get %f", float64(4000*100)/float64(30000), score) + } + +}