Skip to content

Commit

Permalink
The logic for determining the predicate status is optimized.
Browse files Browse the repository at this point in the history
Signed-off-by: w00568049 <[email protected]>
Signed-off-by: wangyang <[email protected]>
  • Loading branch information
wangyang0616 committed Jul 11, 2023
1 parent 611cc78 commit 740c7e9
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 41 deletions.
14 changes: 9 additions & 5 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,20 @@ func (alloc *Action) Execute(ssn *framework.Session) {
return nil, api.NewFitError(task, node, reason)
}

predicateStatus, err := ssn.PredicateFn(task, node)
status, err := ssn.PredicateFn(task, node)
if err != nil {
return predicateStatus, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v",
return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}

preStatus := util.NewPredicateStatus(status)
// Only nodes whose status is success after predicate filtering can be scheduled.
admitStatus := map[int]struct{}{
api.Success: {},
if preStatus.IsContainsUnschedulable() || preStatus.IsContainsUnschedulableAndUnresolvable() ||
preStatus.IsContainsErrorSkipOrWait() {
return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
}
return predicateStatus, util.CheckPredicateStatus(predicateStatus, admitStatus)
return nil, nil
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
Expand Down
14 changes: 6 additions & 8 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,18 @@ func (backfill *Action) Execute(ssn *framework.Session) {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
// Only nodes whose status is success after predicate filtering can be scheduled.
admitStatus := map[int]struct{}{
api.Success: {},
}
predicateStatus, err := ssn.PredicateFn(task, node)
status, err := ssn.PredicateFn(task, node)
if err != nil {
klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}
err = util.CheckPredicateStatus(predicateStatus, admitStatus)
if err != nil {
klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
preStatus := util.NewPredicateStatus(status)
if preStatus.IsContainsUnschedulable() || preStatus.IsContainsUnschedulableAndUnresolvable() ||
preStatus.IsContainsErrorSkipOrWait() {
klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>, status is not success.",
task.Namespace, task.Name, node.Name)
fe.SetNodeError(node.Name, err)
continue
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,17 @@ func preempt(

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
admitStatus := map[int]struct{}{
api.Success: {},
api.Unschedulable: {},
}
predicateStatus, err := ssn.PredicateFn(task, node)
status, err := ssn.PredicateFn(task, node)
if err != nil {
return predicateStatus, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v",
return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}
return predicateStatus, util.CheckPredicateStatus(predicateStatus, admitStatus)
preStatus := util.NewPredicateStatus(status)
if preStatus.IsContainsUnschedulableAndUnresolvable() || preStatus.IsContainsErrorSkipOrWait() {
return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>, status is not success or unschedulable",
task.Namespace, task.Name, node.Name)
}
return nil, nil
}

predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)
Expand Down
17 changes: 6 additions & 11 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,19 @@ func (ra *Action) Execute(ssn *framework.Session) {

assigned := false
for _, n := range ssn.Nodes {
// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
admitStatus := map[int]struct{}{
api.Success: {},
api.Unschedulable: {},
}
predicateStatus, err := ssn.PredicateFn(task, n)
status, err := ssn.PredicateFn(task, n)
if err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
continue
}
// If predicates failed, next node.
if err := util.CheckPredicateStatus(predicateStatus, admitStatus); err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
preStatus := util.NewPredicateStatus(status)
// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
if preStatus.IsContainsUnschedulableAndUnresolvable() || preStatus.IsContainsErrorSkipOrWait() {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>, status is not success or unschedulable.",
task.Namespace, task.Name, n.Name)
continue
}

klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)

Expand Down
60 changes: 50 additions & 10 deletions pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,63 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI
return predicateNodes, fe
}

func CheckPredicateStatus(predicateStatus []*api.Status, admitStatus map[int]struct{}) error {
for _, status := range predicateStatus {
func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
}

func NewPredicateHelper() PredicateHelper {
return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}}
}

type PredicateStatus interface {
IsContainsUnschedulable() bool
IsContainsUnschedulableAndUnresolvable() bool
IsContainsErrorSkipOrWait() bool
}

type statusInfo struct {
status []*api.Status
}

func (s *statusInfo) IsContainsUnschedulable() bool {
for _, status := range s.status {
if status == nil {
continue
}
if _, ok := admitStatus[status.Code]; !ok {
return fmt.Errorf("Predicates status (code: %d) does not meet the expectation (admit status: %v), message: %s",
status.Code, admitStatus, status.Reason)
if status.Code == api.Unschedulable {
return true
}
}
return nil
return false
}

func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
func (s *statusInfo) IsContainsUnschedulableAndUnresolvable() bool {
for _, status := range s.status {
if status == nil {
continue
}
if status.Code == api.UnschedulableAndUnresolvable {
return true
}
}
return false
}

func NewPredicateHelper() PredicateHelper {
return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}}
func (s *statusInfo) IsContainsErrorSkipOrWait() bool {
for _, status := range s.status {
if status == nil {
continue
}
if status.Code == api.Error || status.Code == api.Skip || status.Code == api.Wait {
return true
}
}
return false
}

func NewPredicateStatus(preStatus []*api.Status) PredicateStatus {
predicateStatus := &statusInfo{
status: preStatus,
}
return predicateStatus
}

0 comments on commit 740c7e9

Please sign in to comment.