Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backfill add score process #2955

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 39 additions & 29 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ func (backfill *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Backfill ...")
defer klog.V(5).Infof("Leaving Backfill ...")

predicatFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, err
}

// predicateHelper.PredicateNodes will print the log if predicate failed, so don't print log anymore here
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf(statusSets.Message()) // should not include variables in api node errors
return nil, err
}
return nil, nil
}

// TODO (k82cn): When backfill, it's also need to balance between Queues.
for _, job := range ssn.Jobs {
if job.IsPending() {
Expand All @@ -55,53 +70,48 @@ func (backfill *Action) Execute(ssn *framework.Session) {
continue
}

ph := util.NewPredicateHelper()

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
klog.V(3).Infof("PrePredicate for task %s/%s failed in backfill for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
// Only nodes whose status is success after predicate filtering can be scheduled.
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
fe.SetNodeError(node.Name, err)
continue
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("%s", statusSets.Message())
fe.SetNodeError(node.Name, err)
continue
}
predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicatFunc, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
node := predicateNodes[0]
if len(predicateNodes) > 1 {
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
}
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true
break
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
fe.SetNodeError(node.Name, err)
continue
}

metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
allocated = true

if !allocated {
job.NodesFitErrors[task.UID] = fe
}
Expand Down
Loading