diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 1111515d64a..8199fe7185a 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -103,16 +103,20 @@ func (alloc *Action) Execute(ssn *framework.Session) { return nil, api.NewFitError(task, node, reason) } - predicateStatus, err := ssn.PredicateFn(task, node) + status, err := ssn.PredicateFn(task, node) if err != nil { - return predicateStatus, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v", + return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) } + + preStatus := util.NewPredicateStatus(status) // Only nodes whose status is success after predicate filtering can be scheduled. - admitStatus := map[int]struct{}{ - api.Success: {}, + if preStatus.IsContainsUnschedulable() || preStatus.IsContainsUnschedulableAndUnresolvable() || + preStatus.IsContainsErrorSkipOrWait() { + return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>, status is not success", + task.Namespace, task.Name, node.Name) } - return predicateStatus, util.CheckPredicateStatus(predicateStatus, admitStatus) + return nil, nil } // To pick tuple for job, we choose to pick namespace firstly. diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 468e011cde8..119d4637787 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -74,20 +74,18 @@ func (backfill *Action) Execute(ssn *framework.Session) { // TODO (k82cn): predicates did not consider pod number for now, there'll // be ping-pong case here. // Only nodes whose status is success after predicate filtering can be scheduled. - admitStatus := map[int]struct{}{ - api.Success: {}, - } - predicateStatus, err := ssn.PredicateFn(task, node) + status, err := ssn.PredicateFn(task, node) if err != nil { klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) fe.SetNodeError(node.Name, err) continue } - err = util.CheckPredicateStatus(predicateStatus, admitStatus) - if err != nil { - klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) + preStatus := util.NewPredicateStatus(status) + if preStatus.IsContainsUnschedulable() || preStatus.IsContainsUnschedulableAndUnresolvable() || + preStatus.IsContainsErrorSkipOrWait() { + klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>, status is not success.", + task.Namespace, task.Name, node.Name) fe.SetNodeError(node.Name, err) continue } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index e2a662f8527..50e285ac269 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -210,16 +210,17 @@ func preempt( predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. - admitStatus := map[int]struct{}{ - api.Success: {}, - api.Unschedulable: {}, - } - predicateStatus, err := ssn.PredicateFn(task, node) + status, err := ssn.PredicateFn(task, node) if err != nil { - return predicateStatus, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v", + return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) } - return predicateStatus, util.CheckPredicateStatus(predicateStatus, admitStatus) + preStatus := util.NewPredicateStatus(status) + if preStatus.IsContainsUnschedulableAndUnresolvable() || preStatus.IsContainsErrorSkipOrWait() { + return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>, status is not success or unschedulable", + task.Namespace, task.Name, node.Name) + } + return nil, nil } predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index d8b17a06df9..137be88a004 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -123,24 +123,19 @@ func (ra *Action) Execute(ssn *framework.Session) { assigned := false for _, n := range ssn.Nodes { - // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. - admitStatus := map[int]struct{}{ - api.Success: {}, - api.Unschedulable: {}, - } - predicateStatus, err := ssn.PredicateFn(task, n) + status, err := ssn.PredicateFn(task, n) if err != nil { klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, n.Name, err) continue } - // If predicates failed, next node. - if err := util.CheckPredicateStatus(predicateStatus, admitStatus); err != nil { - klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, n.Name, err) + preStatus := util.NewPredicateStatus(status) + // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. + if preStatus.IsContainsUnschedulableAndUnresolvable() || preStatus.IsContainsErrorSkipOrWait() { + klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>, status is not success or unschedulable.", + task.Namespace, task.Name, n.Name) continue } - klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index dfc3de39aac..983b45e206a 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -100,23 +100,63 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI return predicateNodes, fe } -func CheckPredicateStatus(predicateStatus []*api.Status, admitStatus map[int]struct{}) error { - for _, status := range predicateStatus { +func taskGroupID(task *api.TaskInfo) string { + return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) +} + +func NewPredicateHelper() PredicateHelper { + return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} +} + +type PredicateStatus interface { + IsContainsUnschedulable() bool + IsContainsUnschedulableAndUnresolvable() bool + IsContainsErrorSkipOrWait() bool +} + +type statusInfo struct { + status []*api.Status +} + +func (s *statusInfo) IsContainsUnschedulable() bool { + for _, status := range s.status { if status == nil { continue } - if _, ok := admitStatus[status.Code]; !ok { - return fmt.Errorf("Predicates status (code: %d) does not meet the expectation (admit status: %v), message: %s", - status.Code, admitStatus, status.Reason) + if status.Code == api.Unschedulable { + return true } } - return nil + return false } -func taskGroupID(task *api.TaskInfo) string { - return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) +func (s *statusInfo) IsContainsUnschedulableAndUnresolvable() bool { + for _, status := range s.status { + if status == nil { + continue + } + if status.Code == api.UnschedulableAndUnresolvable { + return true + } + } + return false } -func NewPredicateHelper() PredicateHelper { - return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} +func (s *statusInfo) IsContainsErrorSkipOrWait() bool { + for _, status := range s.status { + if status == nil { + continue + } + if status.Code == api.Error || status.Code == api.Skip || status.Code == api.Wait { + return true + } + } + return false +} + +func NewPredicateStatus(preStatus []*api.Status) PredicateStatus { + predicateStatus := &statusInfo{ + status: preStatus, + } + return predicateStatus }