Skip to content

Commit

Permalink
[YUNIKORN-1995] Preemption CheckPreemptionQueueGuarantee need to comp…
Browse files Browse the repository at this point in the history
…are aggregated Victims with ask resource
  • Loading branch information
zhuqi-lucas committed Sep 20, 2023
1 parent 9ad817f commit 72be246
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 2 deletions.
10 changes: 9 additions & 1 deletion pkg/scheduler/objects/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,21 @@ func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
return false
}

// If current queue after adding the ask is already at or above guaranteed, we can't preempt
currentQueue.AddAllocation(p.ask.GetAllocatedResource())
if currentQueue.IsAtOrAboveGuaranteedResource() {
return false
}

aggregatedVictims := resources.NewResource()
// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
for _, snapshot := range queues {
for _, alloc := range snapshot.PotentialVictims {
snapshot.RemoveAllocation(alloc.GetAllocatedResource())
if currentQueue.IsWithinGuaranteedResource() {
if snapshot.IsAtOrAboveGuaranteedResource() {
aggregatedVictims.AddTo(alloc.allocatedResource)
}
if aggregatedVictims.FitInMaxUndef(p.ask.GetAllocatedResource()) {
return true
}
}
Expand Down
41 changes: 40 additions & 1 deletion pkg/scheduler/objects/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestCheckPreemptionQueueGuarantees(t *testing.T) {
app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 4}))
assert.NilError(t, app2.AddAllocationAsk(ask3))
childQ2.incPendingResource(ask3.GetAllocatedResource())
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
Expand All @@ -128,6 +128,45 @@ func TestCheckPreemptionQueueGuarantees(t *testing.T) {
assert.Assert(t, !preemptor.checkPreemptionQueueGuarantees(), "queue guarantees did not fail")
}

func TestCheckPreemptionQueueGuaranteesWithAggregatedVictims(t *testing.T) {
node := newNode("node1", map[string]resources.Quantity{"first": 20})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"})
assert.NilError(t, err)
childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, map[string]string{"first": "10"}, map[string]string{"first": "2"})
assert.NilError(t, err)
childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, map[string]string{"first": "10"}, map[string]string{"first": "8"})
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
childQ1.applications[appID1] = app1
ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}))
assert.NilError(t, app1.AddAllocationAsk(ask1))
ask2 := newAllocationAsk("alloc2", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}))
assert.NilError(t, app1.AddAllocationAsk(ask2))
app1.AddAllocation(NewAllocation("alloc1", "node1", ask1))
app1.AddAllocation(NewAllocation("alloc2", "node1", ask2))
assert.NilError(t, childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
assert.NilError(t, childQ1.IncAllocatedResource(ask2.GetAllocatedResource(), false))
app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
childQ2.applications[appID2] = app2
ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 4}))
assert.NilError(t, app2.AddAllocationAsk(ask3))
childQ2.incPendingResource(ask3.GetAllocatedResource())
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, iterator(), false)

// The aggregated resource is smaller than the ask resource, so it will fail
assert.Assert(t, !preemptor.checkPreemptionQueueGuarantees(), "queue guarantees did not fail")

// The aggregated resource is larger than the ask resource, so it will succeed
ask3.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
assert.Assert(t, preemptor.checkPreemptionQueueGuarantees(), "queue guarantees fail")
}

func TestCheckPreemptionQueueGuaranteesWithNoGuaranteedResources(t *testing.T) {
var tests = []struct {
testName string
Expand Down

0 comments on commit 72be246

Please sign in to comment.