diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index e137aa2340..e3411e313a 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -44,6 +44,21 @@ func (backfill *Action) Execute(ssn *framework.Session) { klog.V(5).Infof("Enter Backfill ...") defer klog.V(5).Infof("Leaving Backfill ...") + predicatFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, err + } + + // predicateHelper.PredicateNodes will print the log if predicate failed, so don't print log anymore here + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + err := fmt.Errorf(statusSets.Message()) // should not include variables in api node errors + return nil, err + } + return nil, nil + } + // TODO (k82cn): When backfill, it's also need to balance between Queues. for _, job := range ssn.Jobs { if job.IsPending() { @@ -55,13 +70,15 @@ func (backfill *Action) Execute(ssn *framework.Session) { continue } + ph := util.NewPredicateHelper() + for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { allocated := false fe := api.NewFitErrors() if err := ssn.PrePredicateFn(task); err != nil { - klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err) for _, ni := range ssn.Nodes { fe.SetNodeError(ni.Name, err) } @@ -69,39 +86,32 @@ func (backfill *Action) Execute(ssn *framework.Session) { break } - // As task did not request resources, so it only need to meet predicates. - // TODO (k82cn): need to prioritize nodes to avoid pod hole. - for _, node := range ssn.Nodes { - // TODO (k82cn): predicates did not consider pod number for now, there'll - // be ping-pong case here. - // Only nodes whose status is success after predicate filtering can be scheduled. - var statusSets util.StatusSets - statusSets, err := ssn.PredicateFn(task, node) - if err != nil { - fe.SetNodeError(node.Name, err) - continue - } - - if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || - statusSets.ContainsErrorSkipOrWait() { - err := fmt.Errorf("%s", statusSets.Message()) - fe.SetNodeError(node.Name, err) - continue - } + predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true) + if len(predicateNodes) == 0 { + job.NodesFitErrors[task.UID] = fitErrors + break + } - klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) - if err := ssn.Allocate(task, node); err != nil { - klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) - fe.SetNodeError(node.Name, err) - continue + node := predicateNodes[0] + if len(predicateNodes) > 1 { + nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + node = ssn.BestNodeFn(task, nodeScores) + if node == nil { + node = util.SelectBestNode(nodeScores) } + } - metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) - metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) - allocated = true - break + klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) + if err := ssn.Allocate(task, node); err != nil { + klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) + fe.SetNodeError(node.Name, err) + continue } + metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) + metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) + allocated = true + if !allocated { job.NodesFitErrors[task.UID] = fe }