Skip to content

Commit

Permalink
Merge pull request volcano-sh#3471 from archlitchi/master
Browse files Browse the repository at this point in the history
Fix device share plugins npe & update devicescore
  • Loading branch information
volcano-sh-bot authored May 16, 2024
2 parents 016d215 + 0237537 commit 63bf271
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 46 deletions.
2 changes: 2 additions & 0 deletions docs/user-guide/how_to_use_gpu_number.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

Refer to [Install Guide](../../installer/README.md) to install volcano.

> **Note** The Volcano VGPU feature has been transferred to the HAMI project, click [here](https://github.com/Project-HAMi/volcano-vgpu-device-plugin) to access
After installed, update the scheduler configuration:

```shell script
Expand Down
2 changes: 2 additions & 0 deletions docs/user-guide/how_to_use_gpu_sharing.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

Refer to [Install Guide](../../installer/README.md) to install volcano.

> **Note** The Volcano VGPU feature has been transferred to the HAMI project, click [here](https://github.com/Project-HAMi/volcano-vgpu-device-plugin) to access
After installed, update the scheduler configuration:

```shell script
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/api/devices/nvidia/gpushare/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod, schedulePolicy string) (int, string, error) {
klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name)
if GpuSharingEnable {
fit, err := checkNodeGPUSharingPredicate(pod, gs)
Expand All @@ -172,6 +172,10 @@ func (gs *GPUDevices) GetStatus() string {
return ""
}

func (gs *GPUDevices) ScoreNode(pod *v1.Pod, schedulePolicy string) float64 {
return 0
}

func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error {
klog.V(4).Infoln("DeviceSharing:Into AllocateToPod", pod.Name)
if getGPUMemoryOfPod(pod) > 0 {
Expand Down
30 changes: 21 additions & 9 deletions pkg/scheduler/api/devices/nvidia/vgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type GPUDevice struct {
type GPUDevices struct {
Name string

// We cache score in filter step according to schedulePolicy, to avoid recalculating in score
Score float64

Device map[int]*GPUDevice
}

Expand Down Expand Up @@ -90,7 +93,7 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
return nil
}
for _, val := range nodedevices.Device {
klog.V(3).Infoln("name=", nodedevices.Name, "val=", *val)
klog.V(4).Infoln("name=", nodedevices.Name, "val=", *val)
}

// We have to handshake here in order to avoid time-inconsistency between scheduler and nodes
Expand All @@ -100,7 +103,7 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
klog.Infof("node %v device %s leave", node.Name, handshake)

tmppat := make(map[string]string)
tmppat[handshake] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05")
tmppat[VolcanoVGPUHandshake] = "Deleted_" + time.Now().Format("2006.01.02 15:04:05")
patchNodeAnnotations(node, tmppat)
return nil
}
Expand All @@ -114,6 +117,14 @@ func NewGPUDevices(name string, node *v1.Node) *GPUDevices {
return nodedevices
}

func (gs *GPUDevices) ScoreNode(pod *v1.Pod, schedulePolicy string) float64 {
/* TODO: we need a base score to be campatable with preemption, it means a node without evicting a task has
a higher score than those needs to evict a task */

// Use cached stored in filter state in order to avoid recalculating.
return gs.Score
}

func (gs *GPUDevices) GetIgnoredDevices() []string {
return []string{VolcanoVGPUMemory, VolcanoVGPUMemoryPercentage, VolcanoVGPUCores}
}
Expand Down Expand Up @@ -179,23 +190,24 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod, schedulePolicy string) (int, string, error) {
if VGPUEnable {
klog.V(5).Infoln("4pdvgpu DeviceSharing starts filtering pods", pod.Name)
fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true)
klog.V(4).Infoln("hami-vgpu DeviceSharing starts filtering pods", pod.Name)
fit, _, score, err := checkNodeGPUSharingPredicateAndScore(pod, gs, true, schedulePolicy)
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err
return devices.Unschedulable, fmt.Sprintf("hami-vgpuDeviceSharing %s", err.Error()), err
}
klog.V(5).Infoln("4pdvgpu DeviceSharing successfully filters pods")
gs.Score = score
klog.V(4).Infoln("hami-vgpu DeviceSharing successfully filters pods")
}
return devices.Success, "", nil
}

func (gs *GPUDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error {
if VGPUEnable {
klog.V(3).Infoln("VGPU DeviceSharing:Into AllocateToPod", pod.Name)
fit, device, err := checkNodeGPUSharingPredicate(pod, gs, false)
klog.V(4).Infoln("hami-vgpu DeviceSharing:Into AllocateToPod", pod.Name)
fit, device, _, err := checkNodeGPUSharingPredicateAndScore(pod, gs, false, "")
if err != nil || !fit {
klog.Errorln("DeviceSharing err=", err.Error())
return err
Expand Down
10 changes: 6 additions & 4 deletions pkg/scheduler/api/devices/nvidia/vgpu/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ var (

func (gs *GPUDevices) GetStatus() string {
for _, val := range gs.Device {
VGPUDevicesSharedNumber.WithLabelValues(val.UUID).Set(float64(val.UsedNum))
VGPUDevicesSharedMemory.WithLabelValues(val.UUID).Set(float64(val.UsedMem))
VGPUDevicesMemoryLimit.WithLabelValues(val.UUID).Set(float64(val.Memory))
VGPUDevicesSharedCores.WithLabelValues(val.UUID).Set(float64(val.UsedCore))
if val != nil {
VGPUDevicesSharedNumber.WithLabelValues(val.UUID).Set(float64(val.UsedNum))
VGPUDevicesSharedMemory.WithLabelValues(val.UUID).Set(float64(val.UsedMem))
VGPUDevicesMemoryLimit.WithLabelValues(val.UUID).Set(float64(val.Memory))
VGPUDevicesSharedCores.WithLabelValues(val.UUID).Set(float64(val.UsedCore))
}
}
return ""
}
12 changes: 11 additions & 1 deletion pkg/scheduler/api/devices/nvidia/vgpu/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,17 @@ const (
UnhealthyGPUIDs = "volcano.sh/gpu-unhealthy-ids"

// DeviceName used to indicate this device
DeviceName = "vgpu4pd"
DeviceName = "hamivgpu"

// binpack means the lower device memory remained after this allocation, the better
binpackPolicy = "binpack"
// spread means better put this task into an idle GPU card than a shared GPU card
spreadPolicy = "spread"
// 101 means wo don't assign defaultMemPercentage value

DefaultMemPercentage = 101
binpackMultiplier = 100
spreadMultiplier = 100
)

type ContainerDeviceRequest struct {
Expand Down
59 changes: 37 additions & 22 deletions pkg/scheduler/api/devices/nvidia/vgpu/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func init() {
var err error
kubeClient, err = NewClient()
if err != nil {
klog.Errorf("init kubeclient in 4pdvgpu failed: %s", err.Error())
klog.Errorf("init kubeclient in hamivgpu failed: %s", err.Error())
} else {
klog.V(3).Infoln("init kubeclient success")
}
Expand Down Expand Up @@ -96,6 +96,7 @@ func decodeNodeDevices(name string, str string) *GPUDevices {
retval := &GPUDevices{
Name: name,
Device: make(map[int]*GPUDevice),
Score: float64(0),
}
for index, val := range tmp {
if strings.Contains(val, ",") {
Expand Down Expand Up @@ -307,33 +308,37 @@ func getGPUDeviceSnapShot(snap *GPUDevices) *GPUDevices {
ret := GPUDevices{
Name: snap.Name,
Device: make(map[int]*GPUDevice),
Score: float64(0),
}
for index, val := range snap.Device {
ret.Device[index] = &GPUDevice{
ID: val.ID,
UUID: val.UUID,
PodMap: val.PodMap,
Memory: val.Memory,
Number: val.Number,
Type: val.Type,
Health: val.Health,
UsedNum: val.UsedNum,
UsedMem: val.UsedMem,
UsedCore: val.UsedCore,
if val != nil {
ret.Device[index] = &GPUDevice{
ID: val.ID,
UUID: val.UUID,
PodMap: val.PodMap,
Memory: val.Memory,
Number: val.Number,
Type: val.Type,
Health: val.Health,
UsedNum: val.UsedNum,
UsedMem: val.UsedMem,
UsedCore: val.UsedCore,
}
}
}
return &ret
}

// checkNodeGPUSharingPredicate checks if a pod with gpu requirement can be scheduled on a node.
func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate bool) (bool, []ContainerDevices, error) {
func checkNodeGPUSharingPredicateAndScore(pod *v1.Pod, gssnap *GPUDevices, replicate bool, schedulePolicy string) (bool, []ContainerDevices, float64, error) {
// no gpu sharing request
score := float64(0)
if !checkVGPUResourcesInPod(pod) {
return true, []ContainerDevices{}, nil
return true, []ContainerDevices{}, 0, nil
}
ctrReq := resourcereqs(pod)
if len(ctrReq) == 0 {
return true, []ContainerDevices{}, nil
return true, []ContainerDevices{}, 0, nil
}
var gs *GPUDevices
if replicate {
Expand All @@ -345,13 +350,13 @@ func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate boo
for _, val := range ctrReq {
devs := []ContainerDevice{}
if int(val.Nums) > len(gs.Device) {
return false, []ContainerDevices{}, fmt.Errorf("no enough gpu cards on node %s", gs.Name)
return false, []ContainerDevices{}, 0, fmt.Errorf("no enough gpu cards on node %s", gs.Name)
}
klog.V(3).Infoln("Allocating device for container request", val)
klog.V(3).InfoS("Allocating device for container", "request", val)

for i := len(gs.Device) - 1; i >= 0; i-- {
klog.V(3).Info("Scoring pod ", val.Memreq, ":", val.MemPercentagereq, ":", val.Coresreq, ":", val.Nums, "i", i, "device:", gs.Device[i].ID)
klog.V(3).Infoln("gs", i, "=", gs.Device[i].Memory, gs.Device[i].UsedMem, gs.Device[i].UsedNum)
klog.V(3).InfoS("Scoring pod request", "memReq", val.Memreq, "memPercentageReq", val.MemPercentagereq, "coresReq", val.Coresreq, "Nums", val.Nums, "Index", i, "ID", gs.Device[i].ID)
klog.V(3).InfoS("Current Device", "Index", i, "TotalMemory", gs.Device[i].Memory, "UsedMemory", gs.Device[i].UsedMem, "UsedCores", gs.Device[i].UsedNum)
if gs.Device[i].Number <= uint(gs.Device[i].UsedNum) {
continue
}
Expand Down Expand Up @@ -379,7 +384,7 @@ func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate boo
//total += gs.Devices[i].Count
//free += node.Devices[i].Count - node.Devices[i].Used
if val.Nums > 0 {
klog.V(3).Infoln("device", gs.Device[i].ID, "fitted")
klog.V(3).InfoS("device fitted", "ID", gs.Device[i].ID)
val.Nums--
gs.Device[i].UsedNum++
gs.Device[i].UsedMem += uint(val.Memreq)
Expand All @@ -390,17 +395,27 @@ func checkNodeGPUSharingPredicate(pod *v1.Pod, gssnap *GPUDevices, replicate boo
Usedmem: val.Memreq,
Usedcores: val.Coresreq,
})
switch schedulePolicy {
case binpackPolicy:
score += binpackMultiplier * (float64(gs.Device[i].UsedMem) / float64(gs.Device[i].Memory))
case spreadPolicy:
if gs.Device[i].UsedNum == 1 {
score += spreadMultiplier
}
default:
score = float64(0)
}
}
if val.Nums == 0 {
break
}
}
if val.Nums > 0 {
return false, []ContainerDevices{}, fmt.Errorf("not enough gpu fitted on this node")
return false, []ContainerDevices{}, 0, fmt.Errorf("not enough gpu fitted on this node")
}
ctrdevs = append(ctrdevs, devs)
}
return true, ctrdevs, nil
return true, ctrdevs, score, nil
}

func patchPodAnnotations(pod *v1.Pod, annotations map[string]string) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/api/shared_device_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ type Devices interface {
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
FilterNode(pod *v1.Pod) (int, string, error)
FilterNode(pod *v1.Pod, policy string) (int, string, error)
// ScoreNode will be invoked when using devicescore plugin, devices api can use it to implement multiple
// scheduling policies.
ScoreNode(pod *v1.Pod, policy string) float64

// Allocate action in predicate
Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error
// Release action in predicate
Expand Down
Loading

0 comments on commit 63bf271

Please sign in to comment.