diff --git a/docs/design/device-sharing.md b/docs/design/device-sharing.md index e2e5828b88..6d5bb9de45 100644 --- a/docs/design/device-sharing.md +++ b/docs/design/device-sharing.md @@ -26,7 +26,26 @@ type Devices interface { //HasDeviceRequest checks if the 'pod' request this device HasDeviceRequest(pod *v1.Pod) bool //FiltreNode checks if the 'pod' fit in current node - FilterNode(pod *v1.Pod) (bool, error) + // The first return value represents the filtering result, and the value range is "0, 1, 2, 3" + // 0: Success + // Success means that plugin ran correctly and found pod schedulable. + + // 1: Error + // Error is used for internal plugin errors, unexpected input, etc. + + // 2: Unschedulable + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + + // 3: UnschedulableAndUnresolvable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + FilterNode(pod *v1.Pod) (int, string, error) + //Allocate action in predicate Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error //Release action in predicate diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 02fd566426..1ad5a470a3 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -17,6 +17,7 @@ package allocate import ( + "fmt" "time" "k8s.io/klog/v2" @@ -96,13 +97,24 @@ func (alloc *Action) Execute(ssn *framework.Session) { pendingTasks := map[api.JobID]*util.PriorityQueue{} allNodes := ssn.NodeList - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { // Check for Resource Predicate if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok { - return api.NewFitError(task, node, reason) + return nil, api.NewFitError(task, node, reason) + } + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) } - return ssn.PredicateFn(task, node) + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || + statusSets.ContainsErrorSkipOrWait() { + return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>, status is not success", + task.Namespace, task.Name, node.Name) + } + return nil, nil } // To pick tuple for job, we choose to pick namespace firstly. @@ -217,7 +229,7 @@ func (alloc *Action) Execute(ssn *framework.Session) { metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) } } else { - klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources", + klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", task.Namespace, task.Name, node.Name) // Allocate releasing resource to the task if any. diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 18437136ba..6f6847778b 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -17,6 +17,7 @@ limitations under the License. package backfill import ( + "fmt" "time" "k8s.io/klog/v2" @@ -24,6 +25,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/metrics" + "volcano.sh/volcano/pkg/scheduler/util" ) type Action struct{} @@ -72,13 +74,25 @@ func (backfill *Action) Execute(ssn *framework.Session) { for _, node := range ssn.Nodes { // TODO (k82cn): predicates did not consider pod number for now, there'll // be ping-pong case here. - if err := ssn.PredicateFn(task, node); err != nil { - klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", + // Only nodes whose status is success after predicate filtering can be scheduled. + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) + if err != nil { + klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) fe.SetNodeError(node.Name, err) continue } + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || + statusSets.ContainsErrorSkipOrWait() { + err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success", + task.Namespace, task.Name, node.Name) + klog.V(3).Infof("%v", err) + fe.SetNodeError(node.Name, err) + continue + } + klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := ssn.Allocate(task, node); err != nil { klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index fd250e1e34..f3be6aa64b 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -203,13 +203,28 @@ func preempt( predicateHelper util.PredicateHelper, ) (bool, error) { assigned := false - allNodes := ssn.NodeList - if err := ssn.PrePredicateFn(preemptor); err != nil { return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } - predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn, true) + + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) + } + + if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable", + task.Namespace, task.Name, node.Name) + } + return nil, nil + } + + predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 8bb96740d1..8bced3d9ec 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -123,11 +123,20 @@ func (ra *Action) Execute(ssn *framework.Session) { assigned := false for _, n := range ssn.Nodes { - // If predicates failed, next node. - if err := ssn.PredicateFn(task, n); err != nil { + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, n) + if err != nil { + klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, n.Name, err) continue } + // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. + if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.", + task.Namespace, task.Name, n.Name) + continue + } klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) diff --git a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go index 167139f0b3..3604698364 100644 --- a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go @@ -18,6 +18,7 @@ package gpushare import ( "context" + "fmt" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" @@ -26,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "volcano.sh/volcano/pkg/scheduler/api/devices" "volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock" ) @@ -146,24 +148,24 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) { +func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name) if GpuSharingEnable { fit, err := checkNodeGPUSharingPredicate(pod, gs) - if err != nil { + if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return fit, err + return devices.Unschedulable, fmt.Sprintf("GpuShare %s", err.Error()), err } } if GpuNumberEnable { fit, err := checkNodeGPUNumberPredicate(pod, gs) - if err != nil { + if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return fit, err + return devices.Unschedulable, fmt.Sprintf("GpuNumber %s", err.Error()), err } } klog.V(4).Infoln("DeviceSharing:FitInPod successed") - return true, nil + return devices.Success, "", nil } func (gs *GPUDevices) GetStatus() string { diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go index e956e1dafa..ced16ee487 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go @@ -17,6 +17,7 @@ limitations under the License. package vgpu import ( + "fmt" "strconv" "strings" "time" @@ -26,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "volcano.sh/volcano/pkg/scheduler/api/devices" "volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock" ) @@ -177,17 +179,17 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) { +func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { klog.V(3).Infoln("4pdvgpuDeviceSharing:Into FitInPod", pod.Name) if VGPUEnable { fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true) - if err != nil { + if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return fit, err + return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err } } klog.V(3).Infoln("4pdvgpu DeviceSharing:FitInPod successed") - return true, nil + return devices.Success, "", nil } func (gs *GPUDevices) GetStatus() string { diff --git a/pkg/scheduler/api/devices/util.go b/pkg/scheduler/api/devices/util.go new file mode 100644 index 0000000000..783d7bb202 --- /dev/null +++ b/pkg/scheduler/api/devices/util.go @@ -0,0 +1,38 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package devices + +// These are predefined codes used in a Status. +const ( + // Success means that plugin ran correctly and found pod schedulable. + // NOTE: A nil status is also considered as "Success". + Success int = iota + // Error is used for internal plugin errors, unexpected input, etc. + Error + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + Unschedulable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + UnschedulableAndUnresolvable + // Wait is used when a Permit plugin finds a pod scheduling should wait. + Wait + // Skip is used when a Bind plugin chooses to skip binding. + Skip +) diff --git a/pkg/scheduler/api/shared_device_pool.go b/pkg/scheduler/api/shared_device_pool.go index 9156abc3a7..865dba0dcd 100644 --- a/pkg/scheduler/api/shared_device_pool.go +++ b/pkg/scheduler/api/shared_device_pool.go @@ -39,7 +39,25 @@ type Devices interface { //HasDeviceRequest checks if the 'pod' request this device HasDeviceRequest(pod *v1.Pod) bool //FiltreNode checks if the 'pod' fit in current node - FilterNode(pod *v1.Pod) (bool, error) + // The first return value represents the filtering result, and the value range is "0, 1, 2, 3" + // 0: Success + // Success means that plugin ran correctly and found pod schedulable. + + // 1: Error + // Error is used for internal plugin errors, unexpected input, etc. + + // 2: Unschedulable + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + + // 3: UnschedulableAndUnresolvable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + FilterNode(pod *v1.Pod) (int, string, error) //Allocate action in predicate Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error //Release action in predicate diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 699cf8a918..09c5799d00 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -124,6 +124,34 @@ type ValidateResult struct { Message string } +// These are predefined codes used in a Status. +const ( + // Success means that plugin ran correctly and found pod schedulable. + // NOTE: A nil status is also considered as "Success". + Success int = iota + // Error is used for internal plugin errors, unexpected input, etc. + Error + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + Unschedulable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + UnschedulableAndUnresolvable + // Wait is used when a Permit plugin finds a pod scheduling should wait. + Wait + // Skip is used when a Bind plugin chooses to skip binding. + Skip +) + +type Status struct { + Code int + Reason string +} + // ValidateExFn is the func declaration used to validate the result. type ValidateExFn func(interface{}) *ValidateResult @@ -134,7 +162,7 @@ type VoteFn func(interface{}) int type JobEnqueuedFn func(interface{}) // PredicateFn is the func declaration used to predicate node for task. -type PredicateFn func(*TaskInfo, *NodeInfo) error +type PredicateFn func(*TaskInfo, *NodeInfo) ([]*Status, error) // PrePredicateFn is the func declaration used to pre-predicate node for task. type PrePredicateFn func(*TaskInfo) error diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 76a3a41c63..b8bcff7a50 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -601,7 +601,8 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { } // PredicateFn invoke predicate function of the plugins -func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { +func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { if !isEnabled(plugin.EnabledPredicate) { @@ -611,13 +612,14 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { if !found { continue } - err := pfn(task, node) + status, err := pfn(task, node) + predicateStatus = append(predicateStatus, status...) if err != nil { - return err + return predicateStatus, err } } } - return nil + return predicateStatus, nil } // PrePredicateFn invoke predicate function of the plugins diff --git a/pkg/scheduler/framework/util.go b/pkg/scheduler/framework/util.go index 0528c9dba5..e43ef65e34 100644 --- a/pkg/scheduler/framework/util.go +++ b/pkg/scheduler/framework/util.go @@ -17,12 +17,14 @@ limitations under the License. package framework import ( + "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/framework" + k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -223,10 +225,10 @@ func (pal *PodAffinityLister) FilteredList(podFilter PodFilter, selector labels. } // GenerateNodeMapAndSlice returns the nodeMap and nodeSlice generated from ssn -func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) map[string]*schedulernodeinfo.NodeInfo { - nodeMap := make(map[string]*schedulernodeinfo.NodeInfo) +func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) map[string]*k8sframework.NodeInfo { + nodeMap := make(map[string]*k8sframework.NodeInfo) for _, node := range nodes { - nodeInfo := schedulernodeinfo.NewNodeInfo(node.Pods()...) + nodeInfo := k8sframework.NewNodeInfo(node.Pods()...) nodeInfo.SetNode(node.Node) nodeMap[node.Name] = nodeInfo // add imagestate into nodeinfo @@ -263,3 +265,25 @@ func (nl *NodeLister) List() ([]*v1.Node, error) { } return nodes, nil } + +// ConvertPredicateStatus return predicate status from k8sframework status +func ConvertPredicateStatus(status *k8sframework.Status) (*api.Status, error) { + internalStatus := &api.Status{} + if status.Code() == k8sframework.Success { + internalStatus.Code = api.Success + return internalStatus, nil + } else if status.Code() == k8sframework.Unschedulable { + internalStatus.Code = api.Unschedulable + internalStatus.Reason = status.Message() + return internalStatus, nil + } else if status.Code() == k8sframework.UnschedulableAndUnresolvable { + internalStatus.Code = api.UnschedulableAndUnresolvable + internalStatus.Reason = status.Message() + return internalStatus, nil + } else { + internalStatus.Code = api.Error + internalStatus.Reason = status.Message() + return internalStatus, fmt.Errorf("Convert predicate status error, k8s status code is %d, Reason is %s", + status.Code(), status.Message()) + } +} diff --git a/pkg/scheduler/plugins/extender/argument.go b/pkg/scheduler/plugins/extender/argument.go index 123cd18920..cdd130322e 100644 --- a/pkg/scheduler/plugins/extender/argument.go +++ b/pkg/scheduler/plugins/extender/argument.go @@ -22,7 +22,7 @@ type PredicateRequest struct { } type PredicateResponse struct { - ErrorMessage string `json:"errorMessage"` + Status []*api.Status `json:"status"` } type PrioritizeRequest struct { diff --git a/pkg/scheduler/plugins/extender/extender.go b/pkg/scheduler/plugins/extender/extender.go index 43a88139d3..fea755f71c 100644 --- a/pkg/scheduler/plugins/extender/extender.go +++ b/pkg/scheduler/plugins/extender/extender.go @@ -161,22 +161,20 @@ func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) { } if ep.config.predicateVerb != "" { - ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { + ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { resp := &PredicateResponse{} err := ep.send(ep.config.predicateVerb, &PredicateRequest{Task: task, Node: node}, resp) if err != nil { klog.Warningf("Predicate failed with error %v", err) if ep.config.ignorable { - return nil + return nil, nil } - return err + return nil, err } - if resp.ErrorMessage == "" { - return nil - } - return errors.New(resp.ErrorMessage) + predicateStatus := resp.Status + return predicateStatus, nil }) } diff --git a/pkg/scheduler/plugins/numaaware/numaaware.go b/pkg/scheduler/plugins/numaaware/numaaware.go index 2a67a67d13..d428c26b63 100644 --- a/pkg/scheduler/plugins/numaaware/numaaware.go +++ b/pkg/scheduler/plugins/numaaware/numaaware.go @@ -112,14 +112,16 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { }, }) - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) + numaStatus := &api.Status{} if v1qos.GetPodQOS(task.Pod) != v1.PodQOSGuaranteed { klog.V(3).Infof("task %s isn't Guaranteed pod", task.Name) - return nil + return predicateStatus, nil } if fit, err := filterNodeByPolicy(task, node, pp.nodeResSets); !fit { - return err + return predicateStatus, err } resNumaSets := pp.nodeResSets[node.Name].Clone() @@ -130,7 +132,11 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { providersHints := policy.AccumulateProvidersHints(&container, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders) hit, admit := taskPolicy.Predicate(providersHints) if !admit { - return fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s", + numaStatus.Code = api.UnschedulableAndUnresolvable + numaStatus.Reason = fmt.Sprintf("plugin %s predicates failed for task %s container %s on node %s", + pp.Name(), task.Name, container.Name, node.Name) + predicateStatus = append(predicateStatus, numaStatus) + return predicateStatus, fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s", pp.Name(), task.Name, container.Name, node.Name) } @@ -154,7 +160,9 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof(" task %s's on node<%s> resAssignMap: %v", task.Name, node.Name, pp.assignRes[task.UID][node.Name]) - return nil + numaStatus.Code = api.Success + predicateStatus = append(predicateStatus, numaStatus) + return predicateStatus, nil } ssn.AddPredicateFn(pp.Name(), predicateFn) diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index c4d6110851..489a533da4 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -393,51 +393,65 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return nil }) - ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { + ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) nodeInfo, found := nodeMap[node.Name] if !found { - return fmt.Errorf("failed to predicates, node info for %s not found", node.Name) + return predicateStatus, fmt.Errorf("failed to predicates, node info for %s not found", node.Name) } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) { klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed", task.Namespace, task.Name, node.Name) - return api.NewFitError(task, node, api.NodePodNumberExceeded) + podsNumStatus := &api.Status{ + Code: api.Unschedulable, + Reason: fmt.Sprintf("Task <%s/%s> on Node <%s> failed, reason: %s", + task.Namespace, task.Name, node.Name, api.NodePodNumberExceeded), + } + predicateStatus = append(predicateStatus, podsNumStatus) } - predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) { + predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) ([]*api.Status, bool, error) { // CheckNodeUnschedulable + predicateStatus := make([]*api.Status, 0) status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message()) + nodeUnscheduleStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodeUnscheduleStatus) + if err != nil { + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message()) } // Check NodeAffinity if predicate.nodeAffinityEnable { status := nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) + nodeAffinityStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodeAffinityStatus) + if err != nil { + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) } } // PodToleratesNodeTaints: TaintToleration if predicate.taintTolerationEnable { status := tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message()) + tolerationStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, tolerationStatus) + if err != nil { + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message()) } } - return true, nil + return predicateStatus, true, nil } // Check PredicateWithCache var err error var fit bool + predicateCacheStatus := make([]*api.Status, 0) if predicate.cacheEnable { fit, err = pCache.PredicateWithCache(node.Name, task.Pod) if err != nil { - fit, err = predicateByStablefilter(task.Pod, nodeInfo) + predicateCacheStatus, fit, err = predicateByStablefilter(task.Pod, nodeInfo) pCache.UpdateCache(node.Name, task.Pod, fit) } else { if !fit { @@ -445,58 +459,74 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } } } else { - fit, err = predicateByStablefilter(task.Pod, nodeInfo) + predicateCacheStatus, fit, err = predicateByStablefilter(task.Pod, nodeInfo) } + predicateStatus = append(predicateStatus, predicateCacheStatus...) if !fit { - return err + return predicateStatus, err } // Check NodePort if predicate.nodePortEnable { status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message()) + nodePortStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodePortStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message()) } } // Check PodAffinity if predicate.podAffinityEnable { status := podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message()) + podAffinityStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, podAffinityStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message()) } } // Check NodeVolumeLimits if predicate.nodeVolumeLimitsEnable { status := nodeVolumeLimitsCSIFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) + nodeVolumeStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodeVolumeStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) } } // Check VolumeZone if predicate.volumeZoneEnable { status := volumeZoneFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) + volumeZoneStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, volumeZoneStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) } } // Check PodTopologySpread if predicate.podTopologySpreadEnable { status := podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + podTopologyStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, podTopologyStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) } } for _, val := range api.RegisteredDevices { if devices, ok := node.Others[val].(api.Devices); ok { - fit, err = devices.FilterNode(task.Pod) + code, msg, err := devices.FilterNode(task.Pod) + filterNodeStatus := &api.Status{ + Code: code, + Reason: msg, + } + predicateStatus = append(predicateStatus, filterNodeStatus) if err != nil { - return err + return predicateStatus, err } } else { klog.Warningf("Devices %s assertion conversion failed, skip", val) @@ -508,14 +538,15 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { if predicate.proportionalEnable { // Check ProportionalPredicate - fit, err := checkNodeResourceIsProportional(task, node, predicate.proportional) + proportionalStatus, err := checkNodeResourceIsProportional(task, node, predicate.proportional) + predicateStatus = append(predicateStatus, proportionalStatus) if err != nil { - return err + return predicateStatus, err } klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v", task.Namespace, task.Name, node.Name, fit) } - return nil + return predicateStatus, nil }) } diff --git a/pkg/scheduler/plugins/predicates/proportional.go b/pkg/scheduler/plugins/predicates/proportional.go index e3ee83ca62..bd83d0f54f 100644 --- a/pkg/scheduler/plugins/predicates/proportional.go +++ b/pkg/scheduler/plugins/predicates/proportional.go @@ -25,10 +25,12 @@ import ( ) // checkNodeResourceIsProportional checks if a gpu:cpu:memory is Proportional -func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, proportional map[v1.ResourceName]baseResource) (bool, error) { +func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, proportional map[v1.ResourceName]baseResource) (*api.Status, error) { + status := &api.Status{} for resourceName := range proportional { if value, found := task.Resreq.ScalarResources[resourceName]; found && value > 0 { - return true, nil + status.Code = api.Success + return status, nil } } for resourceName, resourceRate := range proportional { @@ -38,9 +40,11 @@ func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, pro r := node.Idle.Clone() r = r.Sub(task.Resreq) if r.MilliCPU < cpuReserved || r.Memory < memoryReserved { - return false, fmt.Errorf("proportional of resource %s check failed", resourceName) + status.Code = api.Unschedulable + status.Reason = fmt.Sprintf("proportional of resource %s check failed", resourceName) + return status, fmt.Errorf("proportional of resource %s check failed", resourceName) } } } - return true, nil + return status, nil } diff --git a/pkg/scheduler/plugins/predicates/proportional_test.go b/pkg/scheduler/plugins/predicates/proportional_test.go index 08b14c3f81..a893b2ac78 100644 --- a/pkg/scheduler/plugins/predicates/proportional_test.go +++ b/pkg/scheduler/plugins/predicates/proportional_test.go @@ -45,7 +45,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { tests := []struct { name string args args - want bool + want int wantErr bool }{ { @@ -55,7 +55,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n1, proportional: proportional, }, - true, + api.Success, false, }, { @@ -65,7 +65,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n1, proportional: proportional, }, - false, + api.Unschedulable, true, }, { @@ -75,7 +75,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n1, proportional: proportional, }, - true, + api.Success, false, }, { @@ -85,7 +85,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n2, proportional: proportional, }, - true, + api.Success, false, }, } @@ -96,7 +96,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { t.Errorf("checkNodeResourceIsProportional() error = %v, wantErr %v", err, tt.wantErr) return } - if got != tt.want { + if got.Code != tt.want { t.Errorf("checkNodeResourceIsProportional() got = %v, want %v", got, tt.want) } }) diff --git a/pkg/scheduler/plugins/tdm/tdm.go b/pkg/scheduler/plugins/tdm/tdm.go index 2bdd139ece..f28aac4d18 100644 --- a/pkg/scheduler/plugins/tdm/tdm.go +++ b/pkg/scheduler/plugins/tdm/tdm.go @@ -143,24 +143,30 @@ func (tp *tdmPlugin) OnSessionOpen(ssn *framework.Session) { }() // tdm plugin just handle revocable node - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) + tdmStatus := &api.Status{} if node.RevocableZone == "" { - return nil + return predicateStatus, nil } if err := tp.availableRevocableZone(node.RevocableZone); err != nil { - return fmt.Errorf("plugin %s predicates %w", tp.Name(), err) + tdmStatus.Code = api.UnschedulableAndUnresolvable + tdmStatus.Reason = fmt.Sprintf("plugin %s predicates %v", tp.Name(), err) + return predicateStatus, fmt.Errorf("plugin %s predicates %v", tp.Name(), err) } klog.V(4).Infof("TDM node %v revocable zone %v:%v is active", node.Name, node.RevocableZone, tp.revocableZone[node.RevocableZone]) if len(task.RevocableZone) == 0 { msg := fmt.Sprintf("task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name) - return fmt.Errorf("plugin %s predicates %s", tp.Name(), msg) + return predicateStatus, fmt.Errorf("plugin %s predicates %s", tp.Name(), msg) } + tdmStatus.Code = api.Success + predicateStatus = append(predicateStatus, tdmStatus) klog.V(4).Infof("TDM filter for Task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name) - return nil + return predicateStatus, nil } // tdm plugin just handle revocable node diff --git a/pkg/scheduler/plugins/tdm/tdm_test.go b/pkg/scheduler/plugins/tdm/tdm_test.go index 88c2248681..c2ec63f1d7 100644 --- a/pkg/scheduler/plugins/tdm/tdm_test.go +++ b/pkg/scheduler/plugins/tdm/tdm_test.go @@ -288,7 +288,18 @@ func Test_TDM(t *testing.T) { predicatedNode := make([]*api.NodeInfo, 0) for _, node := range ssn.Nodes { - if err := ssn.PredicateFn(task, node); err != nil { + predicateStatus, err := ssn.PredicateFn(task, node) + if err != nil { + continue + } + predicateIsSuccess := true + for _, status := range predicateStatus { + if status != nil && status.Code != api.Success { + predicateIsSuccess = false + break + } + } + if predicateIsSuccess == false { continue } predicatedNode = append(predicatedNode, node) diff --git a/pkg/scheduler/plugins/usage/usage.go b/pkg/scheduler/plugins/usage/usage.go index 690111f42a..d11388d722 100644 --- a/pkg/scheduler/plugins/usage/usage.go +++ b/pkg/scheduler/plugins/usage/usage.go @@ -124,12 +124,17 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("Threshold arguments :%v", argsValue) } - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) + usageStatus := &api.Status{} for period, value := range up.threshold.cpuUsageAvg { klog.V(4).Infof("predicateFn cpuUsageAvg:%v", up.threshold.cpuUsageAvg) if node.ResourceUsage.CPUUsageAvg[period] > value { msg := fmt.Sprintf("Node %s cpu usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.CPUUsageAvg[period], value) - return fmt.Errorf("plugin %s predicates failed %s", up.Name(), msg) + usageStatus.Code = api.Unschedulable + usageStatus.Reason = msg + predicateStatus = append(predicateStatus, usageStatus) + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", up.Name(), msg) } } @@ -137,11 +142,17 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("predicateFn memUsageAvg:%v", up.threshold.memUsageAvg) if node.ResourceUsage.MEMUsageAvg[period] > value { msg := fmt.Sprintf("Node %s mem usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.MEMUsageAvg[period], value) - return fmt.Errorf("plugin %s memory usage predicates failed %s", up.Name(), msg) + usageStatus.Code = api.Unschedulable + usageStatus.Reason = msg + predicateStatus = append(predicateStatus, usageStatus) + return predicateStatus, fmt.Errorf("plugin %s memory usage predicates failed %s", up.Name(), msg) } } + + usageStatus.Code = api.Success + predicateStatus = append(predicateStatus, usageStatus) klog.V(4).Infof("Usage plugin filter for task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name) - return nil + return predicateStatus, nil } nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 114851b41d..ef7983f452 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -70,7 +70,7 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI } // TODO (k82cn): Enable eCache for performance improvement. - if err := fn(task, node); err != nil { + if _, err := fn(task, node); err != nil { klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) errorLock.Lock() @@ -107,3 +107,41 @@ func taskGroupID(task *api.TaskInfo) string { func NewPredicateHelper() PredicateHelper { return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} } + +type StatusSets []*api.Status + +func (s StatusSets) ContainsUnschedulable() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == api.Unschedulable { + return true + } + } + return false +} + +func (s StatusSets) ContainsUnschedulableAndUnresolvable() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == api.UnschedulableAndUnresolvable { + return true + } + } + return false +} + +func (s StatusSets) ContainsErrorSkipOrWait() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == api.Error || status.Code == api.Skip || status.Code == api.Wait { + return true + } + } + return false +}