Skip to content

Commit

Permalink
remove node out of sync state
Browse files Browse the repository at this point in the history
Signed-off-by: Xuzheng Chang <[email protected]>
  • Loading branch information
Monokaix committed Jul 27, 2023
1 parent 03b5da4 commit b354161
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 48 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (alloc *Action) Execute(ssn *framework.Session) {
allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Check for Resource Predicate
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, reason)
if ok, resources := task.InitResreq.LessEqualWithResources(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, api.WrapInsufficientResourceReason(resources))
}
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
Expand Down
34 changes: 13 additions & 21 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,8 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) {
}

// set NodeState according to resources
if !ni.Used.LessEqual(ni.Allocatable, Zero) {
ni.State = NodeState{
Phase: NotReady,
Reason: "OutOfSync",
}
return
if ok, resources := ni.Used.LessEqualWithResources(ni.Allocatable, Zero); !ok {
klog.ErrorS(nil, "Node out of sync", "name", ni.Name, "resources", resources)
}

// If node not ready, e.g. power off
Expand Down Expand Up @@ -372,30 +368,30 @@ func (ni *NodeInfo) setNode(node *v1.Node) {
for _, ti := range ni.Tasks {
switch ti.Status {
case Releasing:
ni.Idle.sub(ti.Resreq) // sub without assertion
ni.allocateIdleResource(ti)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
ni.Idle.sub(ti.Resreq) // sub without assertion
ni.allocateIdleResource(ti)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
}
}
}

func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) error {
if ti.Resreq.LessEqual(ni.Idle, Zero) {
func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) {
ok, resources := ti.Resreq.LessEqualWithResources(ni.Idle, Zero)
if ok {
ni.Idle.Sub(ti.Resreq)
return nil
return
}

return &AllocateFailError{Reason: fmt.Sprintf(
"cannot allocate resource, <%s> idle: %s <%s/%s> req: %s",
ni.Name, ni.Idle.String(), ti.Namespace, ti.Name, ti.Resreq.String(),
)}
ni.Idle.sub(ti.Resreq)
klog.ErrorS(nil, "Allocate resource exception, set idle resource to negative",
"nodeName", ni.Name, "task", klog.KObj(ti.Pod), "resources", resources, "idle", ni.Idle.String(), "req", ti.Resreq.String())
}

// AddTask is used to add a task in nodeInfo object
Expand All @@ -420,18 +416,14 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
if ni.Node != nil {
switch ti.Status {
case Releasing:
if err := ni.allocateIdleResource(ti); err != nil {
return err
}
ni.allocateIdleResource(ti)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
if err := ni.allocateIdleResource(ti); err != nil {
return err
}
ni.allocateIdleResource(ti)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,31 @@ func TestNodeInfo_AddPod(t *testing.T) {
},
},
{
name: "add 1 unknown pod",
name: "add 1 unknown pod and pod memory req > idle",
node: case02Node,
pods: []*v1.Pod{case02Pod1},
expected: &NodeInfo{
Name: "n2",
Node: case02Node,
Idle: buildResource("2000m", "1G"),
Used: EmptyResource(),
Idle: buildResource("1000m", "-1G"),
Used: buildResource("1000m", "2G"),
Releasing: EmptyResource(),
Pipelined: EmptyResource(),
OversubscriptionResource: EmptyResource(),
Allocatable: buildResource("2000m", "1G"),
Capacity: buildResource("2000m", "1G"),
ResourceUsage: &NodeUsage{},
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case02Pod1),
},
Others: map[string]interface{}{
GPUSharingDevice: gpushare.NewGPUDevices("n2", case01Node),
vgpu.DeviceName: vgpu.NewGPUDevices("n2", case01Node),
},
ImageStates: make(map[string]*k8sframework.ImageStateSummary),
},
expectedFailure: true,
expectedFailure: false,
},
}

Expand All @@ -117,7 +119,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
}

if !nodeInfoEqual(ni, test.expected) {
t.Errorf("node info %d: \n expected %v, \n got %v \n",
t.Errorf("node info %d: \n %v, \n %v \n",
i, test.expected, ni)
}
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,12 @@ func (r *Resource) LessEqual(rr *Resource, defaultValue DimensionDefaultValue) b
return true
}

// LessEqualWithReason returns true, "" only on condition that all dimensions of resources in r are less than or equal with that of rr,
// Otherwise returns false and err string ,which show which resource is insufficient.
// LessEqualWithResources returns true, []string{} only on condition that all dimensions of resources in r are less than or equal with that of rr,
// Otherwise returns false and err string ,which show what resources are insufficient.
// @param defaultValue "default value for resource dimension not defined in ScalarResources. Its value can only be one of 'Zero' and 'Infinity'"
// this function is the same as LessEqual , and it will be merged to LessEqual in the future
func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefaultValue) (bool, string) {
func (r *Resource) LessEqualWithResources(rr *Resource, defaultValue DimensionDefaultValue) (bool, []string) {
resources := []string{}
lessEqualFunc := func(l, r, diff float64) bool {
if l < r || math.Abs(l-r) < diff {
return true
Expand All @@ -418,10 +419,10 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau
}

if !lessEqualFunc(r.MilliCPU, rr.MilliCPU, minResource) {
return false, "Insufficient cpu"
resources = append(resources, "cpu")
}
if !lessEqualFunc(r.Memory, rr.Memory, minResource) {
return false, "Insufficient memory"
resources = append(resources, "memory")
}

for resourceName, leftValue := range r.ScalarResources {
Expand All @@ -431,10 +432,13 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau
}

if !lessEqualFunc(leftValue, rightValue, minResource) {
return false, "Insufficient " + string(resourceName)
resources = append(resources, string(resourceName))
}
}
return true, ""
if len(resources) > 0 {
return false, resources
}
return true, resources
}

// LessPartly returns true if there exists any dimension whose resource amount in r is less than that in rr.
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/api/resource_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ func TestResource_LessEqualResource(t *testing.T) {
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
resource2: &Resource{},
expected: "Insufficient cpu",
expected: "cpu",
},
{
resource1: &Resource{
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "Insufficient scalar.test/scalar1",
expected: "scalar.test/scalar1",
},
{
resource1: &Resource{
Expand All @@ -1344,7 +1344,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "Insufficient cpu",
expected: "cpu",
},
}

Expand Down Expand Up @@ -1374,18 +1374,18 @@ func TestResource_LessEqualResource(t *testing.T) {
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
resource2: &Resource{},
expected: "Insufficient cpu",
expected: "cpu",
},
}

for _, test := range testsForDefaultZero {
_, reason := test.resource1.LessEqualWithReason(test.resource2, Zero)
_, reason := test.resource1.LessEqualWithResources(test.resource2, Zero)
if !reflect.DeepEqual(test.expected, reason) {
t.Errorf("expected: %#v, got: %#v", test.expected, reason)
}
}
for caseID, test := range testsForDefaultInfinity {
_, reason := test.resource1.LessEqualWithReason(test.resource2, Infinity)
_, reason := test.resource1.LessEqualWithResources(test.resource2, Infinity)
if !reflect.DeepEqual(test.expected, reason) {
t.Errorf("caseID %d expected: %#v, got: %#v", caseID, test.expected, reason)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError {
func (f *FitError) Error() string {
return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", "))
}

// WrapInsufficientResourceReason wrap insufficient resource reason.
func WrapInsufficientResourceReason(resources []string) string {
if len(resources) == 0{
return ""
}
return "Insufficient " + resources[0]
}
6 changes: 0 additions & 6 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func (sc *SchedulerCache) addTask(pi *schedulingapi.TaskInfo) error {
node := sc.Nodes[pi.NodeName]
if !isTerminated(pi.Status) {
if err := node.AddTask(pi); err != nil {
if _, outOfSync := err.(*schedulingapi.AllocateFailError); outOfSync {
node.State = schedulingapi.NodeState{
Phase: schedulingapi.NotReady,
Reason: "OutOfSync",
}
}
return err
}
} else {
Expand Down

0 comments on commit b354161

Please sign in to comment.