Skip to content

Commit

Permalink
backfill add score process
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed Jul 14, 2023
1 parent 5302995 commit 3d35d8a
Showing 1 changed file with 41 additions and 33 deletions.
74 changes: 41 additions & 33 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ func (backfill *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Backfill ...")
defer klog.V(5).Infof("Leaving Backfill ...")

predicatFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, err
}

// predicateHelper.PredicateNodes will print the log if predicate failed
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("predicates status is not success in backfill") // should not include variables in api node errors
return nil, err
}
return nil, nil
}

// TODO (k82cn): When backfill, it's also need to balance between Queues.
for _, job := range ssn.Jobs {
if job.IsPending() {
Expand All @@ -55,6 +70,8 @@ func (backfill *Action) Execute(ssn *framework.Session) {
continue
}

ph := util.NewPredicateHelper()

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
Expand All @@ -69,42 +86,33 @@ func (backfill *Action) Execute(ssn *framework.Session) {
break
}

// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
// Only nodes whose status is success after predicate filtering can be scheduled.
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
klog.V(3).Infof("%v", err)
fe.SetNodeError(node.Name, err)
continue
}
predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
klog.V(3).Infof("Predicates failed for task <%s/%s>", task.Namespace, task.Name)
job.NodesFitErrors[task.UID] = fitErrors
break
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true
break
}
// As task did not request resources, so it only need to meet predicates.
// Done (k82cn): need to prioritize nodes to avoid pod hole.
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
Expand Down

0 comments on commit 3d35d8a

Please sign in to comment.