Skip to content

Commit

Permalink
[YUNIKORN-2980] DaemonSet preemption: don't flood the logs if victim …
Browse files Browse the repository at this point in the history
…selection fails
  • Loading branch information
pbacsko committed Nov 20, 2024
1 parent 6ef347b commit eb96632
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ const PreemptionPreconditionsFailed = "Preemption preconditions failed"
const PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed"
const PreemptionShortfall = "Preemption helped but short of resources"
const PreemptionDoesNotHelp = "Preemption does not help"
const NoVictimForRequiredNode = "RequiredNode preemption failed, no victim found"
5 changes: 5 additions & 0 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ func (a *Allocation) SendPredicatesFailedEvent(predicateErrors map[string]int) {
a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID, predicateErrors, a.GetAllocatedResource())
}

// SendRequiredNodePreemptionFailedEvent updates the event system with required node preemption failed event.
func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) {
a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, a.applicationID, node, a.GetAllocatedResource())
}

// GetAllocationLog returns a list of log entries corresponding to allocation preconditions not being met.
func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
a.RLock()
Expand Down
9 changes: 3 additions & 6 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,9 +1409,6 @@ func (sa *Application) tryPreemption(headRoom *resources.Resource, preemptionDel
}

func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allocation) bool {
log.Log(log.SchedApplication).Info("Triggering preemption process for daemon set ask",
zap.String("ds allocation key", ask.GetAllocationKey()))

// try preemption and see if we can free up resource
preemptor := NewRequiredNodePreemptor(reserve.node, ask)
preemptor.filterAllocations()
Expand All @@ -1432,11 +1429,11 @@ func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allo
ask.MarkTriggeredPreemption()
sa.notifyRMAllocationReleased(victims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
"preempting allocations to free up resources to run daemon set ask: "+ask.GetAllocationKey())

return true
}
log.Log(log.SchedApplication).Warn("Problem in finding the victims for preempting resources to meet required ask requirements",
zap.String("ds allocation key", ask.GetAllocationKey()),
zap.String("node id", reserve.nodeID))
ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
return false
}

Expand Down
175 changes: 175 additions & 0 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package objects
import (
"fmt"
"math"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -2916,6 +2917,172 @@ func TestPredicateFailedEvents(t *testing.T) {
assert.Equal(t, "Unschedulable request 'alloc-0': fake predicate plugin failed (2x); ", event.Message)
}

func TestRequiredNodePreemption(t *testing.T) {
// tests successful RequiredNode (DaemonSet) preemption
app := newApplication(appID0, "default", "root.default")
var releaseEvents []*rmevent.RMReleaseAllocationEvent
app.rmEventHandler = &mockAppEventHandler{
callback: func(ev interface{}) {
if rmEvent, ok := ev.(*rmevent.RMReleaseAllocationEvent); ok {
releaseEvents = append(releaseEvents, rmEvent)
go func() {
rmEvent.Channel <- &rmevent.Result{
Succeeded: true,
}
}()
}
},
}
node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
iterator := getNodeIteratorFn(node)
getNode := func(nodeID string) *Node {
return node
}

// set queue
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"})
assert.NilError(t, err)
app.SetQueue(childQ)

// add an ask
mockEvents := mock.NewEventSystem()
askRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
ask1 := newAllocationAsk("ask-1", "app-1", askRes)
ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
err = app.AddAllocationAsk(ask1)
assert.NilError(t, err, "could not add ask-1")
preemptionAttemptsRemaining := 0

// allocate ask
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1")
assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key")

// add ask2 with required node
ask2 := newAllocationAsk("ask-2", "app-1", askRes)
ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
ask2.requiredNode = nodeID1
err = app.AddAllocationAsk(ask2)
assert.NilError(t, err, "could not add ask-2")

// try to allocate ask2 with node being full - expect a reservation
result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved")
assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key")
err = app.Reserve(node, ask2)
assert.NilError(t, err, "reservation failed")

// preemption
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, ask1.IsPreempted(), "ask1 has not been preempted")
assert.Assert(t, ask2.HasTriggeredPreemption(), "ask2 has not triggered preemption")
assert.Equal(t, 1, len(releaseEvents), "unexpected number of release events")
assert.Equal(t, 1, len(releaseEvents[0].ReleasedAllocations), "unexpected number of release allocations")
assert.Equal(t, "ask-1", releaseEvents[0].ReleasedAllocations[0].AllocationKey, "allocation key")

// 2nd attempt - no preemption this time
releaseEvents = nil
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, releaseEvents == nil, "unexpected release event")

// check for preemption related events
for _, event := range mockEvents.Events {
assert.Assert(t, !strings.Contains(strings.ToLower(event.Message), "preemption"), "received a preemption related event")
}
}

func TestRequiredNodePreemptionFailed(t *testing.T) {
// tests RequiredNode (DaemonSet) preemption where the victim pod has a high priority, hence preemption is not possible
app := newApplication(appID0, "default", "root.default")
var releaseEvents []*rmevent.RMReleaseAllocationEvent
app.rmEventHandler = &mockAppEventHandler{
callback: func(ev interface{}) {
if rmEvent, ok := ev.(*rmevent.RMReleaseAllocationEvent); ok {
releaseEvents = append(releaseEvents, rmEvent)
go func() {
rmEvent.Channel <- &rmevent.Result{
Succeeded: true,
}
}()
}
},
}
node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
iterator := getNodeIteratorFn(node)
getNode := func(nodeID string) *Node {
return node
}

// set queue
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"})
assert.NilError(t, err)
app.SetQueue(childQ)

// add an ask with high priority
mockEvents := mock.NewEventSystem()
askRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
ask1 := newAllocationAsk("ask-1", "app-1", askRes)
ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
ask1.priority = 1000
err = app.AddAllocationAsk(ask1)
assert.NilError(t, err, "could not add ask-1")
preemptionAttemptsRemaining := 0

// allocate ask
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1")
assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key")

// add ask2 with required node
ask2 := newAllocationAsk("ask-2", "app-1", askRes)
ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
ask2.requiredNode = nodeID1
err = app.AddAllocationAsk(ask2)
assert.NilError(t, err, "could not add ask-2")

// try to allocate ask2 with node being full - expect a reservation
result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved")
assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key")
err = app.Reserve(node, ask2)
assert.NilError(t, err, "reservation failed")

// try preemption - should not succeed
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, !ask1.IsPreempted(), "unexpected preemption of ask1")
assert.Assert(t, !ask2.HasTriggeredPreemption(), "unexpected preemption triggered from ask2")
assert.Equal(t, 0, len(releaseEvents), "unexpected number of release events")
// check for events
noEvents := 0
var requestEvt *si.EventRecord
for _, event := range mockEvents.Events {
if event.Type == si.EventRecord_REQUEST && strings.Contains(strings.ToLower(event.Message), "preemption") {
noEvents++
requestEvt = event
}
}
assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
assert.Equal(t, "Unschedulable request 'ask-2' with required node 'node-1', no preemption victim found", requestEvt.Message)
assert.Equal(t, 1, len(ask2.allocLog), "unexpected number of entries in the allocation log")
assert.Equal(t, int32(1), ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry count")
assert.Equal(t, common.NoVictimForRequiredNode, ask2.allocLog[common.NoVictimForRequiredNode].Message, "unexpected log message")

// check counting & event throttling
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
assert.Equal(t, int32(4), ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry count")
}

type testIterator struct{}

func (testIterator) ForEachNode(fn func(*Node) bool) {
Expand Down Expand Up @@ -3035,3 +3202,11 @@ func TestGetUint64Tag(t *testing.T) {
})
}
}

type mockAppEventHandler struct {
callback func(ev interface{})
}

func (m mockAppEventHandler) HandleEvent(ev interface{}) {
m.callback(ev)
}
22 changes: 17 additions & 5 deletions pkg/scheduler/objects/events/ask_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (

// AskEvents Request-specific events. These events are of REQUEST type, so they are eventually sent to the respective pods in K8s.
type AskEvents struct {
eventSystem events.EventSystem
limiter *rate.Limiter
eventSystem events.EventSystem
predicateLimiter *rate.Limiter
reqNodeLimiter *rate.Limiter
}

func (ae *AskEvents) SendRequestExceedsQueueHeadroom(allocKey, appID string, headroom, allocatedResource *resources.Resource, queuePath string) {
Expand Down Expand Up @@ -73,7 +74,7 @@ func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey, appID string, allocate
}

func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, predicateErrors map[string]int, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.predicateLimiter.Allow() {
return
}

Expand All @@ -94,13 +95,24 @@ func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, predicateError
ae.eventSystem.AddEvent(event)
}

func (ae *AskEvents) SendRequiredNodePreemptionFailed(allocKey, appID, node string, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.reqNodeLimiter.Allow() {
return
}

message := fmt.Sprintf("Unschedulable request '%s' with required node '%s', no preemption victim found", allocKey, node)
event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource)
ae.eventSystem.AddEvent(event)
}

func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}

func newAskEventsWithRate(evt events.EventSystem, interval time.Duration, burst int) *AskEvents {
return &AskEvents{
eventSystem: evt,
limiter: rate.NewLimiter(rate.Every(interval), burst),
eventSystem: evt,
predicateLimiter: rate.NewLimiter(rate.Every(interval), burst),
reqNodeLimiter: rate.NewLimiter(rate.Every(interval), burst),
}
}
40 changes: 38 additions & 2 deletions pkg/scheduler/objects/events/ask_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,48 @@ func TestPredicateFailedEvents(t *testing.T) {
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, "app-0", event.ReferenceID)

eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
events.SendPredicatesFailed("alloc-0", "app-0", errors, resource)
events.SendPredicatesFailed("alloc-1", "app-0", errors, resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message)
assert.Equal(t, "Unschedulable request 'alloc-1': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message)
}

func TestRequiredNodePreemptionFailedEvents(t *testing.T) {
resource := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
events := NewAskEvents(eventSystem)
events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1, resource)
assert.Equal(t, 0, len(eventSystem.Events))

eventSystem = mock.NewEventSystem()
events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
// only the first event is expected to be emitted due to rate limiting
for i := 0; i < 200; i++ {
events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1, resource)
}
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-0' with required node 'node-1', no preemption victim found", event.Message)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, "app-0", event.ReferenceID)
protoRes := resources.NewResourceFromProto(event.Resource)
assert.DeepEqual(t, resource, protoRes)

eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
events.SendRequiredNodePreemptionFailed("alloc-1", "app-0", nodeID1, resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-1' with required node 'node-1', no preemption victim found", event.Message)
}

0 comments on commit eb96632

Please sign in to comment.