Skip to content

Commit

Permalink
overused support check request resource
Browse files Browse the repository at this point in the history
1. if overusedCheckGangEnable in allocate, overused check weather job's request will exceed capability or deserved according to it is preemptable or not;
2. if overusedCheckGangEnable in reclaim, Preemptive check weather job's request will exceed deserved, if true, job in that queue can not reclaim

Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed Nov 9, 2024
1 parent 8b2f918 commit f272ca2
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 30 deletions.
28 changes: 24 additions & 4 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
enableGangCheckOverused bool // indicates whether consider job's min request(gang-scheduling) when check overused
}

func New() *Action {
return &Action{
enablePredicateErrorCache: true, // default to enable it
enablePredicateErrorCache: true, // default to enable it
enableGangCheckOverused: false, // default to disable it
}
}

Expand All @@ -50,6 +52,7 @@ func (alloc *Action) Initialize() {}
func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, alloc.Name())
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
arguments.GetBool(&alloc.enableGangCheckOverused, conf.EnableGangCheckOverusedKey)
}

func (alloc *Action) Execute(ssn *framework.Session) {
Expand Down Expand Up @@ -132,9 +135,11 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a

queue := queues.Pop().(*api.QueueInfo)

if ssn.Overused(queue) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
if !alloc.enableGangCheckOverused {
if ssn.Overused(queue, nil) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}
}

klog.V(3).Infof("Try to allocate resource to Jobs in Queue <%s>", queue.Name)
Expand All @@ -146,6 +151,21 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
}

job := jobs.Pop().(*api.JobInfo)

if len(job.TaskStatusIndex[api.Pending]) == 0 {
klog.V(4).Infof("Job <%v/%v> has no pending tasks, skip it.", job.Namespace, job.Name)
queues.Push(queue)
continue
}
// check if job's queue will overused when allocated resource for it
if alloc.enableGangCheckOverused {
if ssn.Overused(queue, job) {
klog.V(3).Infof("Queue <%s> will be overused if resource is allocated to job <%s/%s>, ignore it.", queue.Name, job.Namespace, job.Name)
queues.Push(queue) // should consider other jobs in this queue, so put it back
continue
}
}

if _, found = pendingTasks[job.UID]; !found {
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
Expand Down
48 changes: 36 additions & 12 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

type Action struct{}
type Action struct {
enableGangCheckOverused bool
}

func New() *Action {
return &Action{}
return &Action{
enableGangCheckOverused: false,
}
}

func (ra *Action) Name() string {
Expand All @@ -37,10 +42,17 @@ func (ra *Action) Name() string {

func (ra *Action) Initialize() {}

func (ra *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, ra.Name())
arguments.GetBool(&ra.enableGangCheckOverused, conf.EnableGangCheckOverusedKey)
}

func (ra *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Reclaim ...")
defer klog.V(5).Infof("Leaving Reclaim ...")

ra.parseArguments(ssn)

queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueMap := map[api.QueueID]*api.QueueInfo{}

Expand Down Expand Up @@ -94,9 +106,11 @@ func (ra *Action) Execute(ssn *framework.Session) {
var task *api.TaskInfo

queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
if !ra.enableGangCheckOverused {
if ssn.Overused(queue, nil) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}
}

// Found "high" priority job
Expand All @@ -123,14 +137,24 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

if !ssn.Allocatable(queue, task) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
continue
}
if ra.enableGangCheckOverused {
if !ssn.Preemptive(queue, job) {
klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering job <%s> , ignore it.", queue.Name, job.Name)
queues.Push(queue) // need considering the next job in queue
continue
}
} else {
if !ssn.Allocatable(queue, task) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
queues.Push(queue)
continue
}

if !ssn.Preemptive(queue, task) {
klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering task <%s> , ignore it.", queue.Name, task.Name)
continue
if !ssn.Preemptive(queue, task) {
klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering task <%s> , ignore it.", queue.Name, task.Name)
queues.Push(queue)
continue
}
}

if err := ssn.PrePredicateFn(task); err != nil {
Expand Down
104 changes: 104 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/capacity"
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
Expand Down Expand Up @@ -198,3 +199,106 @@ func TestReclaim(t *testing.T) {
})
}
}

func TestEnableGangReclaim(t *testing.T) {
req1 := api.BuildResourceList("1", "1G")
req2 := api.BuildResourceList("2", "2G")
min := api.BuildResourceList("2", "2G")
mid := api.BuildResourceList("3", "3G")
max := api.BuildResourceList("4", "4G") // 2*req2
common := uthelper.TestCommonStruct{
Plugins: map[string]framework.PluginBuilder{
conformance.PluginName: conformance.New,
gang.PluginName: gang.New,
capacity.PluginName: capacity.New,
},
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithMinResources("pg0", "c1", "q1", 0, nil, nil, schedulingv1beta1.PodGroupRunning),
util.BuildPodGroupWithMinResources("pg1", "c1", "q2", 1, nil, req1, schedulingv1beta1.PodGroupRunning),
util.BuildPodGroupWithMinResources("pg2", "c1", "q2", 2, nil, max, schedulingv1beta1.PodGroupInqueue),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, req2, "pg0", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, req1, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, req2, "pg2", nil, nil),
util.BuildPod("c1", "preemptor2", "", v1.PodPending, req2, "pg2", nil, nil),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("4", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueueWithResourcesQuantity("q1", req1, min),
util.BuildQueueWithResourcesQuantity("q2", mid, max),
},
}
tests := []struct {
enableGang bool
uthelper.TestCommonStruct
}{
{
enableGang: false,
TestCommonStruct: uthelper.TestCommonStruct{
Name: "when enableGangCheckOverused=false, can reclaim one pod but can not meet gang",
Plugins: common.Plugins,
PodGroups: common.PodGroups,
Pods: common.Pods,
Nodes: common.Nodes,
Queues: common.Queues,
ExpectEvictNum: 1,
ExpectEvicted: []string{"c1/preemptee1"},
},
},
{
enableGang: true,
TestCommonStruct: uthelper.TestCommonStruct{
Name: "when enableGangCheckOverused=true, can not reclaim",
Plugins: common.Plugins,
PodGroups: common.PodGroups,
Pods: common.Pods,
Nodes: common.Nodes,
Queues: common.Queues,
ExpectEvictNum: 0,
ExpectEvicted: []string{},
},
},
}

reclaim := New()
trueValue := true
tiers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: conformance.PluginName,
EnabledReclaimable: &trueValue,
},
{
Name: gang.PluginName,
EnabledReclaimable: &trueValue,
},
{
Name: capacity.PluginName,
EnabledReclaimable: &trueValue,
EnabledOverused: &trueValue,
EnabledAllocatable: &trueValue,
EnablePreemptive: &trueValue,
},
{
Name: priority.PluginName,
EnabledJobOrder: &trueValue,
EnabledTaskOrder: &trueValue,
},
},
},
}
for i, test := range tests {
t.Run(test.Name, func(t *testing.T) {
test.RegisterSession(tiers, []conf.Configuration{{Name: reclaim.Name(), Arguments: map[string]interface{}{conf.EnableGangCheckOverusedKey: test.enableGang}}})
defer test.Close()
test.Run([]framework.Action{reclaim})
if err := test.CheckAll(i); err != nil {
t.Fatal(err)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ package conf
const (
// EnablePredicateErrCacheKey is the key whether predicate error cache is enabled
EnablePredicateErrCacheKey = "predicateErrorCacheEnable"
// EnableGangCheckOverusedKey is the key whether check the job's min request when check queue overused
EnableGangCheckOverusedKey = "overusedCheckGangEnable"
)
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Session struct {
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
overusedFns map[string]api.ValidateWithCandidateFn
// preemptiveFns means whether current queue can reclaim from other queue,
// while reclaimableFns means whether current queue's resources can be reclaimed.
preemptiveFns map[string]api.ValidateWithCandidateFn
Expand Down Expand Up @@ -143,7 +143,7 @@ func openSession(cache cache.Cache) *Session {
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
overusedFns: map[string]api.ValidateWithCandidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (ssn *Session) AddNodeReduceFn(name string, pf api.NodeReduceFn) {
}

// AddOverusedFn add overused function
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) {
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateWithCandidateFn) {
ssn.overusedFns[name] = fn
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskI
}

// Overused invoke overused function of the plugins
func (ssn *Session) Overused(queue *api.QueueInfo) bool {
func (ssn *Session) Overused(queue *api.QueueInfo, req interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledOverused) {
Expand All @@ -265,7 +265,7 @@ func (ssn *Session) Overused(queue *api.QueueInfo) bool {
if !found {
continue
}
if of(queue) {
if of(queue, req) {
return true
}
}
Expand All @@ -275,7 +275,7 @@ func (ssn *Session) Overused(queue *api.QueueInfo) bool {
}

// Preemptive invoke can preemptive function of the plugins
func (ssn *Session) Preemptive(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
func (ssn *Session) Preemptive(queue *api.QueueInfo, candidate interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
of, found := ssn.preemptiveFns[plugin.Name]
Expand Down
53 changes: 49 additions & 4 deletions pkg/scheduler/plugins/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,64 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
return victims, util.Permit
})

ssn.AddOverusedFn(cp.Name(), func(obj interface{}, req interface{}) bool {
queue := obj.(*api.QueueInfo)
attr := cp.queueOpts[queue.UID]
var overused, preemptable bool
var allocated *api.Resource
var candidateName string

// overused = false default, this will keep original: capacity has no overused check and return false
if req != nil {
allocated = attr.allocated.Clone()
var resReq *api.Resource
if job, ok := req.(*api.JobInfo); ok {
preemptable = job.Preemptable
resReq = job.GetMinResources()
allocated.Add(resReq) // future used resource when job is allocated
candidateName = job.Name
} else if task, ok := req.(*api.TaskInfo); ok {
preemptable = task.Preemptable
resReq = task.Resreq
allocated.Add(resReq)
candidateName = task.Name
}
if preemptable { // preemptable jobs/tasks can use up to capacity of queue
overused = !allocated.LessEqualWithDimension(attr.capability, resReq)
} else {
overused = !allocated.LessEqualWithDimension(attr.deserved, resReq)
}
}
metrics.UpdateQueueOverused(attr.name, overused)
if overused {
klog.V(3).Infof("Queue <%v>: deserved <%v>, future allocated <%v>, share <%v>, capability <%v>, candidate=%s, preemptable=%v",
queue.Name, attr.deserved, allocated, attr.share, attr.capability, candidateName, preemptable)
}
return overused
})

ssn.AddPreemptiveFn(cp.Name(), func(obj interface{}, candidate interface{}) bool {
if !readyToSchedule {
klog.V(3).Infof("Capacity plugin failed to check queue's hierarchical structure!")
return false
}

queue := obj.(*api.QueueInfo)
task := candidate.(*api.TaskInfo)
attr := cp.queueOpts[queue.UID]

futureUsed := attr.allocated.Clone().Add(task.Resreq)
overused := !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq)
metrics.UpdateQueueOverused(attr.name, overused)
var overused bool // default preemptive is true
if task, ok1 := candidate.(*api.TaskInfo); ok1 {
futureUsed := attr.allocated.Clone().Add(task.Resreq)
overused = !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq)
} else if job, ok2 := candidate.(*api.JobInfo); ok2 {
request := job.GetMinResources()
futureUsed := attr.allocated.Clone().Add(request)
// no matter preemtable or not, if used will exceed deserverd, cannot reclaim
preeptive := futureUsed.LessEqualWithDimension(attr.deserved, request)
overused = !preeptive
}

//metrics.UpdateQueueOverused(attr.name, overused)
if overused {
klog.V(3).Infof("Queue <%v> can not reclaim, deserved <%v>, allocated <%v>, share <%v>",
queue.Name, attr.deserved, attr.allocated, attr.share)
Expand Down
Loading

0 comments on commit f272ca2

Please sign in to comment.