Skip to content

Commit

Permalink
Resolve the cyclic dependency problem between framework and util
Browse files Browse the repository at this point in the history
Signed-off-by: w00568049 <[email protected]>
  • Loading branch information
wangyang0616 committed Jun 27, 2023
1 parent 23d0d2a commit 94124f7
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 16 deletions.
8 changes: 7 additions & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package allocate

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -102,11 +103,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {
return nil, api.NewFitError(task, node, reason)
}

predicateStatus, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}
// Only nodes whose status is success after predicate filtering can be scheduled.
admitStatus := map[int]struct{}{
api.Success: {},
}
return nil, util.PredicateForAdmitStatus(ssn, task, node, admitStatus)
return nil, util.CheckPredicateStatus(predicateStatus, admitStatus)
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
Expand Down
12 changes: 10 additions & 2 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,17 @@ func (backfill *Action) Execute(ssn *framework.Session) {
admitStatus := map[int]struct{}{
api.Success: {},
}
err := util.PredicateForAdmitStatus(ssn, task, node, admitStatus)
predicateStatus, err := ssn.PredicateFn(task, node)
if err != nil {
klog.V(3).Infof("backfill %s", err.Error())
klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}
err = util.CheckPredicateStatus(predicateStatus, admitStatus)
if err != nil {
klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,12 @@ func preempt(
api.Success: {},
api.Unschedulable: {},
}
return nil, util.PredicateForAdmitStatus(ssn, task, node, admitStatus)
predicateStatus, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}
return nil, util.CheckPredicateStatus(predicateStatus, admitStatus)
}

predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)
Expand Down
12 changes: 9 additions & 3 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package reclaim

import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
Expand Down Expand Up @@ -128,9 +127,16 @@ func (ra *Action) Execute(ssn *framework.Session) {
api.Success: {},
api.Unschedulable: {},
}
predicateStatus, err := ssn.PredicateFn(task, n)
if err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
continue
}
// If predicates failed, next node.
if err := util.PredicateForAdmitStatus(ssn, task, n, admitStatus); err != nil {
klog.V(3).Infof("reclaim %s", err.Error())
if err := util.CheckPredicateStatus(predicateStatus, admitStatus); err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
continue
}

Expand Down
12 changes: 3 additions & 9 deletions pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
)

type PredicateHelper interface {
Expand Down Expand Up @@ -101,19 +100,14 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI
return predicateNodes, fe
}

func PredicateForAdmitStatus(ssn *framework.Session, task *api.TaskInfo, n *api.NodeInfo, admitStatus map[int]struct{}) error {
predicateStatus, err := ssn.PredicateFn(task, n)
if err != nil {
return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
}
func CheckPredicateStatus(predicateStatus []*api.Status, admitStatus map[int]struct{}) error {
for _, status := range predicateStatus {
if status == nil {
continue
}
if _, ok := admitStatus[status.Code]; !ok {
return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, status.Reason)
return fmt.Errorf("Predicates status (code: %d) does not meet the expectation (admit status: %v), message: %s",
status.Code, admitStatus, status.Reason)
}
}
return nil
Expand Down

0 comments on commit 94124f7

Please sign in to comment.