From 3d35d8ab958c6cf692e2b0a6719887beca6653ec Mon Sep 17 00:00:00 2001 From: lowang-bh Date: Sat, 8 Jul 2023 10:41:27 +0800 Subject: [PATCH] backfill add score process Signed-off-by: lowang-bh --- pkg/scheduler/actions/backfill/backfill.go | 74 ++++++++++++---------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 6f6847778b9..8aa07a7419a 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -44,6 +44,21 @@ func (backfill *Action) Execute(ssn *framework.Session) { klog.V(5).Infof("Enter Backfill ...") defer klog.V(5).Infof("Leaving Backfill ...") + predicatFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, err + } + + // predicateHelper.PredicateNodes will print the log if predicate failed + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + err := fmt.Errorf("predicates status is not success in backfill") // should not include variables in api node errors + return nil, err + } + return nil, nil + } + // TODO (k82cn): When backfill, it's also need to balance between Queues. for _, job := range ssn.Jobs { if job.IsPending() { @@ -55,6 +70,8 @@ func (backfill *Action) Execute(ssn *framework.Session) { continue } + ph := util.NewPredicateHelper() + for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { allocated := false @@ -69,42 +86,33 @@ func (backfill *Action) Execute(ssn *framework.Session) { break } - // As task did not request resources, so it only need to meet predicates. - // TODO (k82cn): need to prioritize nodes to avoid pod hole. - for _, node := range ssn.Nodes { - // TODO (k82cn): predicates did not consider pod number for now, there'll - // be ping-pong case here. - // Only nodes whose status is success after predicate filtering can be scheduled. - var statusSets util.StatusSets - statusSets, err := ssn.PredicateFn(task, node) - if err != nil { - klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) - fe.SetNodeError(node.Name, err) - continue - } - - if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || - statusSets.ContainsErrorSkipOrWait() { - err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success", - task.Namespace, task.Name, node.Name) - klog.V(3).Infof("%v", err) - fe.SetNodeError(node.Name, err) - continue - } + predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true) + if len(predicateNodes) == 0 { + klog.V(3).Infof("Predicates failed for task <%s/%s>", task.Namespace, task.Name) + job.NodesFitErrors[task.UID] = fitErrors + break + } - klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) - if err := ssn.Allocate(task, node); err != nil { - klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) - fe.SetNodeError(node.Name, err) - continue + node := predicateNodes[0] + if len(predicateNodes) > 1 { + nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + node = ssn.BestNodeFn(task, nodeScores) + if node == nil { + node = util.SelectBestNode(nodeScores) } - - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) - allocated = true - break } + // As task did not request resources, so it only need to meet predicates. + // Done (k82cn): need to prioritize nodes to avoid pod hole. + klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) + if err := ssn.Allocate(task, node); err != nil { + klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) + fe.SetNodeError(node.Name, err) + continue + } + + metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) + metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) + allocated = true if !allocated { job.NodesFitErrors[task.UID] = fe