From f1d8522175a827836bd3f48d3416378ac8b47f90 Mon Sep 17 00:00:00 2001 From: wangyang Date: Thu, 29 Jun 2023 16:39:13 +0800 Subject: [PATCH] Abstract the definition and member method of the predicate status Signed-off-by: wangyang --- pkg/scheduler/actions/allocate/allocate.go | 14 +++--- pkg/scheduler/actions/backfill/backfill.go | 17 ++++---- pkg/scheduler/actions/preempt/preempt.go | 14 +++--- pkg/scheduler/actions/reclaim/reclaim.go | 18 +++----- pkg/scheduler/framework/util.go | 2 +- pkg/scheduler/util/predicate_helper.go | 51 +++++++++++++++++----- 6 files changed, 74 insertions(+), 42 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 43627197673..eb0af30bec6 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -102,17 +102,19 @@ func (alloc *Action) Execute(ssn *framework.Session) { if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok { return nil, api.NewFitError(task, node, reason) } - - predicateStatus, err := ssn.PredicateFn(task, node) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) if err != nil { return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) } - // Only nodes whose status is success after predicate filtering can be scheduled. - admitStatus := map[int]struct{}{ - api.Success: {}, + + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || + statusSets.ContainsErrorSkipOrWait() { + return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>, status is not success", + task.Namespace, task.Name, node.Name) } - return nil, util.CheckPredicateStatus(predicateStatus, admitStatus) + return nil, nil } // To pick tuple for job, we choose to pick namespace firstly. diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 468e011cde8..f45b6f3f66f 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -17,6 +17,7 @@ limitations under the License. package backfill import ( + "fmt" "time" "k8s.io/klog/v2" @@ -74,20 +75,20 @@ func (backfill *Action) Execute(ssn *framework.Session) { // TODO (k82cn): predicates did not consider pod number for now, there'll // be ping-pong case here. // Only nodes whose status is success after predicate filtering can be scheduled. - admitStatus := map[int]struct{}{ - api.Success: {}, - } - predicateStatus, err := ssn.PredicateFn(task, node) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) if err != nil { klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) fe.SetNodeError(node.Name, err) continue } - err = util.CheckPredicateStatus(predicateStatus, admitStatus) - if err != nil { - klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) + + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || + statusSets.ContainsErrorSkipOrWait() { + err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success.", + task.Namespace, task.Name, node.Name) + klog.V(3).Infof("err: %v", err) fe.SetNodeError(node.Name, err) continue } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 4850816f9ec..f3be6aa64b3 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -210,16 +210,18 @@ func preempt( predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. - admitStatus := map[int]struct{}{ - api.Success: {}, - api.Unschedulable: {}, - } - predicateStatus, err := ssn.PredicateFn(task, node) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) if err != nil { return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) } - return nil, util.CheckPredicateStatus(predicateStatus, admitStatus) + + if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable", + task.Namespace, task.Name, node.Name) + } + return nil, nil } predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index d8b17a06df9..8bced3d9ec9 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -123,24 +123,20 @@ func (ra *Action) Execute(ssn *framework.Session) { assigned := false for _, n := range ssn.Nodes { - // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. - admitStatus := map[int]struct{}{ - api.Success: {}, - api.Unschedulable: {}, - } - predicateStatus, err := ssn.PredicateFn(task, n) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, n) if err != nil { klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, n.Name, err) continue } - // If predicates failed, next node. - if err := util.CheckPredicateStatus(predicateStatus, admitStatus); err != nil { - klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, n.Name, err) + + // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. + if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.", + task.Namespace, task.Name, n.Name) continue } - klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) diff --git a/pkg/scheduler/framework/util.go b/pkg/scheduler/framework/util.go index 7f238fcb5f0..32f356a9657 100644 --- a/pkg/scheduler/framework/util.go +++ b/pkg/scheduler/framework/util.go @@ -266,7 +266,7 @@ func (nl *NodeLister) List() ([]*v1.Node, error) { return nodes, nil } -// The state of the k8s prefile is converted to the internal state of the volcano +// ConvertPredicateStatus return predicate status from k8sframework status func ConvertPredicateStatus(status *k8sframework.Status) (*api.Status, error) { internalStatus := &api.Status{} if status.Code() == k8sframework.Success { diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index dfc3de39aac..6895b86de47 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -100,23 +100,54 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI return predicateNodes, fe } -func CheckPredicateStatus(predicateStatus []*api.Status, admitStatus map[int]struct{}) error { - for _, status := range predicateStatus { +func taskGroupID(task *api.TaskInfo) string { + return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) +} + +func NewPredicateHelper() PredicateHelper { + return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} +} + +type PredicateStatus interface { + IsContainsUnschedulable() bool + IsContainsUnschedulableAndUnresolvable() bool + IsContainsErrorSkipOrWait() bool +} + +type StatusSets []*api.Status + +func (s StatusSets) ContainsUnschedulable() bool { + for _, status := range s { if status == nil { continue } - if _, ok := admitStatus[status.Code]; !ok { - return fmt.Errorf("Predicates status (code: %d) does not meet the expectation (admit status: %v), message: %s", - status.Code, admitStatus, status.Reason) + if status.Code == api.Unschedulable { + return true } } - return nil + return false } -func taskGroupID(task *api.TaskInfo) string { - return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) +func (s StatusSets) ContainsUnschedulableAndUnresolvable() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == api.UnschedulableAndUnresolvable { + return true + } + } + return false } -func NewPredicateHelper() PredicateHelper { - return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} +func (s StatusSets) ContainsErrorSkipOrWait() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == api.Error || status.Code == api.Skip || status.Code == api.Wait { + return true + } + } + return false }