Skip to content

Commit

Permalink
Merge pull request volcano-sh#3335 from Lily922/preempt
Browse files Browse the repository at this point in the history
Support preempting BestEffort pods when the pods number of nodes reaches the upper limit
  • Loading branch information
volcano-sh-bot authored Mar 11, 2024
2 parents 0a26eb8 + 59b54cc commit 0559427
Show file tree
Hide file tree
Showing 10 changed files with 436 additions and 84 deletions.
158 changes: 112 additions & 46 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,62 @@ func (backfill *Action) Execute(ssn *framework.Session) {
}

// TODO (k82cn): When backfill, it's also need to balance between Queues.
pendingTasks := backfill.pickUpPendingTasks(ssn)
for _, task := range pendingTasks {
job := ssn.Jobs[task.Job]
ph := util.NewPredicateHelper()
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
// TODO (k82cn): backfill for other case.
}
}

func (backfill *Action) UnInitialize() {}

func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskInfo {
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
jobs := map[api.QueueID]*util.PriorityQueue{}
tasks := map[api.JobID]*util.PriorityQueue{}
var pendingTasks []*api.TaskInfo
for _, job := range ssn.Jobs {
if job.IsPending() {
continue
Expand All @@ -70,55 +126,65 @@ func (backfill *Action) Execute(ssn *framework.Session) {
continue
}

ph := util.NewPredicateHelper()
queue, found := ssn.Queues[job.Queue]
if !found {
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
if !task.BestEffort {
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

for _, task := range job.TaskStatusIndex[api.Pipelined] {
if !task.BestEffort {
continue
}

stmt := framework.NewStatement(ssn)
err := stmt.UnPipeline(task)
if err != nil {
klog.Errorf("Failed to unpipeline task: %s", err.Error())
continue
}
if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
tasks[job.UID].Push(task)
}

if _, existed := tasks[job.UID]; !existed {
continue
}

if _, existed := jobs[queue.UID]; !existed {
queues.Push(queue)
jobs[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
jobs[job.Queue].Push(job)
}

for !queues.Empty() {
queue, ok := queues.Pop().(*api.QueueInfo)
if !ok {
klog.V(3).Infof("QueueInfo transition failed, ignore it.")
continue
}
for !jobs[queue.UID].Empty() {
job, ok := jobs[queue.UID].Pop().(*api.JobInfo)
if !ok {
klog.Errorf("JobInfo transition failed, ignore it.")
continue
}
for !tasks[job.UID].Empty() {
pendingTasks = append(pendingTasks, tasks[job.UID].Pop().(*api.TaskInfo))
}
// TODO (k82cn): backfill for other case.
}
}
return pendingTasks
}

func (backfill *Action) UnInitialize() {}
162 changes: 162 additions & 0 deletions pkg/scheduler/actions/backfill/backfill_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package backfill

import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
"volcano.sh/volcano/pkg/scheduler/util"
)

func TestPickUpPendingTasks(t *testing.T) {
framework.RegisterPluginBuilder("priority", priority.New)
framework.RegisterPluginBuilder("drf", drf.New)
trueValue := true
tilers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: "priority",
EnabledPreemptable: &trueValue,
EnabledTaskOrder: &trueValue,
EnabledJobOrder: &trueValue,
},
{
Name: "drf",
EnabledQueueOrder: &trueValue,
},
},
},
}

priority4, priority3, priority2, priority1 := int32(4), int32(3), int32(2), int32(1)

testCases := []struct {
name string
pipelinedPods []*v1.Pod
pendingPods []*v1.Pod
queues []*schedulingv1beta1.Queue
podGroups []*schedulingv1beta1.PodGroup
PriorityClasses map[string]*schedulingapi.PriorityClass
expectedResult []string
}{
{
name: "test",
pendingPods: []*v1.Pod{
util.BuildPodWithPriority("default", "pg1-besteffort-task-1", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-1", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg1-besteffort-task-3", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority3),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-3", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority3),

util.BuildPodWithPriority("default", "pg2-besteffort-task-1", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-1", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority1),
util.BuildPodWithPriority("default", "pg2-besteffort-task-3", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority3),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-3", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority3),
},
pipelinedPods: []*v1.Pod{
util.BuildPodWithPriority("default", "pg1-besteffort-task-2", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-2", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg1-besteffort-task-4", "", v1.PodPending, nil, "pg1", make(map[string]string), make(map[string]string), &priority4),
util.BuildPodWithPriority("default", "pg1-unbesteffort-task-4", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg1", make(map[string]string), make(map[string]string), &priority4),

util.BuildPodWithPriority("default", "pg2-besteffort-task-2", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-2", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority2),
util.BuildPodWithPriority("default", "pg2-besteffort-task-4", "", v1.PodPending, nil, "pg2", make(map[string]string), make(map[string]string), &priority4),
util.BuildPodWithPriority("default", "pg2-unbesteffort-task-4", "", v1.PodPending, v1.ResourceList{"cpu": resource.MustParse("500m")}, "pg2", make(map[string]string), make(map[string]string), &priority4),
},
queues: []*schedulingv1beta1.Queue{
util.BuildQueue("q1", 1, nil),
},
podGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithPrio("pg1", "default", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "job-priority-1"),
util.BuildPodGroupWithPrio("pg2", "default", "q1", 1, map[string]int32{"": 3}, schedulingv1beta1.PodGroupInqueue, "job-priority-2"),
},
PriorityClasses: map[string]*schedulingapi.PriorityClass{
"job-priority-1": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-1",
},
Value: 1,
},
"job-priority-2": {
ObjectMeta: metav1.ObjectMeta{
Name: "job-priority-2",
},
Value: 2,
},
},

expectedResult: []string{
"pg2-besteffort-task-4",
"pg2-besteffort-task-3",
"pg2-besteffort-task-2",
"pg2-besteffort-task-1",
"pg1-besteffort-task-4",
"pg1-besteffort-task-3",
"pg1-besteffort-task-2",
"pg1-besteffort-task-1",
},
},
}

for _, tc := range testCases {
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: nil,
StatusUpdater: &util.FakeStatusUpdater{},
VolumeBinder: &util.FakeVolumeBinder{},
Recorder: record.NewFakeRecorder(100),
PriorityClasses: tc.PriorityClasses,
}

for _, q := range tc.queues {
schedulerCache.AddQueueV1beta1(q)
}

for _, ss := range tc.podGroups {
schedulerCache.AddPodGroupV1beta1(ss)
}

for _, pod := range tc.pendingPods {
schedulerCache.AddPod(pod)
}

for _, pod := range tc.pipelinedPods {
schedulerCache.AddPod(pod)
}

ssn := framework.OpenSession(schedulerCache, tilers, []conf.Configuration{})
for _, pod := range tc.pipelinedPods {
jobID := api.NewTaskInfo(pod).Job
stmt := framework.NewStatement(ssn)
task, found := ssn.Jobs[jobID].Tasks[api.PodKey(pod)]
if found {
stmt.Pipeline(task, "node1")
}
}

tasks := New().pickUpPendingTasks(ssn)
var actualResult []string
for _, task := range tasks {
actualResult = append(actualResult, task.Name)
}

if !assert.Equal(t, tc.expectedResult, actualResult) {
t.Errorf("unexpected test; name: %s, expected result: %v, actual result: %v", tc.name, tc.expectedResult, actualResult)
}
}
}
8 changes: 4 additions & 4 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
if !task.Preemptable {
Expand Down Expand Up @@ -172,8 +172,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
if !api.PreemptableStatus(task.Status) {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
// BestEffort pod is not supported to preempt unBestEffort pod.
if preemptor.BestEffort && !task.BestEffort {
return false
}
// Preempt tasks within job.
Expand Down
Loading

0 comments on commit 0559427

Please sign in to comment.