Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
manirajv06 committed Sep 5, 2024
1 parent 02c28ed commit f996239
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 286 deletions.
5 changes: 5 additions & 0 deletions pkg/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ import "errors"
var (
// InvalidQueueName returned when queue name is invalid
InvalidQueueName = errors.New("invalid queue name, max 64 characters consisting of alphanumeric characters and '-', '_', '#', '@', '/', ':' allowed")

PreemptionPreconditionsFailed = errors.New("PreemptionPreconditionsFailed")
PreemptionDoesNotGuarantee = errors.New("PreemptionQueueGuaranteesCheckFailed")
PreemptionShortfall = errors.New("PreemptionHelpedButShortfallOfResources")
PreemptionDoesNotHelp = errors.New("PreemptionDoesNotHelp")
)
36 changes: 0 additions & 36 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,39 +573,3 @@ func (a *Allocation) setUserQuotaCheckPassed() {
a.askEvents.SendRequestFitsInUserQuota(a.allocationKey, a.applicationID, a.allocatedResource)
}
}

func (a *Allocation) setPreemptionPreConditionsCheckPassed() {
a.Lock()
defer a.Unlock()
if a.preemptionPreConditionsCheckFailed {
a.preemptionPreConditionsCheckFailed = false
a.askEvents.SendPreemptionPreConditionsCheckPassed(a.allocationKey, a.applicationID, a.allocatedResource)
}
}

func (a *Allocation) setPreemptionPreConditionsCheckFailed() {
a.Lock()
defer a.Unlock()
if !a.preemptionPreConditionsCheckFailed {
a.preemptionPreConditionsCheckFailed = true
a.askEvents.SendPreemptionPreConditionsCheckFailed(a.allocationKey, a.applicationID, a.allocatedResource)
}
}

func (a *Allocation) setPreemptionQueueGuaranteesCheckPassed() {
a.Lock()
defer a.Unlock()
if a.preemptionQueueGuaranteesCheckFailed {
a.preemptionQueueGuaranteesCheckFailed = false
a.askEvents.SendPreemptionQueueGuaranteesCheckPassed(a.allocationKey, a.applicationID, a.allocatedResource)
}
}

func (a *Allocation) setPreemptionQueueGuaranteesCheckFailed() {
a.Lock()
defer a.Unlock()
if !a.preemptionQueueGuaranteesCheckFailed {
a.preemptionQueueGuaranteesCheckFailed = true
a.askEvents.SendPreemptionQueueGuaranteesCheckFailed(a.allocationKey, a.applicationID, a.allocatedResource)
}
}
15 changes: 5 additions & 10 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ const (
Soft string = "Soft"
Hard string = "Hard"

NotEnoughUserQuota = "Not enough user quota"
NotEnoughQueueQuota = "Not enough queue quota"
PreemptionDoesNotHelp = "Preemption does not help"
NotEnoughUserQuota = "Not enough user quota"
NotEnoughQueueQuota = "Not enough queue quota"
)

type PlaceholderData struct {
Expand Down Expand Up @@ -1057,7 +1056,7 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption
// preemption occurred, and possibly reservation
return result
}
request.LogAllocationFailure(PreemptionDoesNotHelp, true)
request.LogAllocationFailure(common.PreemptionDoesNotHelp.Error(), true)
}
}
request.LogAllocationFailure(NotEnoughQueueQuota, true) // error message MUST be constant!
Expand Down Expand Up @@ -1124,7 +1123,7 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption
return result
}
}
request.LogAllocationFailure(PreemptionDoesNotHelp, true)
request.LogAllocationFailure(common.PreemptionDoesNotHelp.Error(), true)

Check warning on line 1126 in pkg/scheduler/objects/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/application.go#L1126

Added line #L1126 was not covered by tests
}
}
}
Expand Down Expand Up @@ -1400,13 +1399,9 @@ func (sa *Application) tryPreemption(headRoom *resources.Resource, preemptionDel

// validate prerequisites for preemption of an ask and mark ask for preemption if successful
if !preemptor.CheckPreconditions() {
ask.setPreemptionPreConditionsCheckFailed()
log.Log(log.SchedApplication).Debug("Preemption Preconditions check failed",
zap.String("applicationID", sa.ApplicationID),
zap.String("ask", ask.GetAllocationKey()))
ask.LogAllocationFailure(common.PreemptionPreconditionsFailed.Error(), true)
return nil, false
}
ask.setPreemptionPreConditionsCheckPassed()

// track time spent trying preemption
tryPreemptionStart := time.Now()
Expand Down
36 changes: 0 additions & 36 deletions pkg/scheduler/objects/events/ask_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,42 +94,6 @@ func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, predicateError
ae.eventSystem.AddEvent(event)
}

func (ae *AskEvents) SendPreemptionPreConditionsCheckPassed(allocKey, appID string, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
message := fmt.Sprintf("Preemption pre conditions check passed for ask %s belongs to app %s", allocKey, appID)
event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource)
ae.eventSystem.AddEvent(event)
}

func (ae *AskEvents) SendPreemptionPreConditionsCheckFailed(allocKey, appID string, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
message := fmt.Sprintf("Preemption pre conditions check failed for ask %s belongs to app %s", allocKey, appID)
event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource)
ae.eventSystem.AddEvent(event)
}

func (ae *AskEvents) SendPreemptionQueueGuaranteesCheckPassed(allocKey, appID string, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
message := fmt.Sprintf("Preemption queue guarantees check passed for ask %s belongs to app %s", allocKey, appID)
event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource)
ae.eventSystem.AddEvent(event)
}

func (ae *AskEvents) SendPreemptionQueueGuaranteesCheckFailed(allocKey, appID string, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
message := fmt.Sprintf("Preemption queue guarantees check failed for ask %s belongs to app %s", allocKey, appID)
event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource)
ae.eventSystem.AddEvent(event)
}

func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}
Expand Down
58 changes: 0 additions & 58 deletions pkg/scheduler/objects/events/ask_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,64 +112,6 @@ func TestRequestFitsInUserQuotaEvent(t *testing.T) {
assert.Equal(t, "Request 'alloc-0' fits in the available user quota", event.Message)
}

func TestPreemptionPreConditionsCheckEvents(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
events := NewAskEvents(eventSystem)
events.SendPreemptionPreConditionsCheckFailed(allocKey, appID, requestResource)
assert.Equal(t, 0, len(eventSystem.Events))

eventSystem = mock.NewEventSystem()
events = NewAskEvents(eventSystem)
events.SendPreemptionPreConditionsCheckFailed(allocKey, appID, requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "Preemption pre conditions check failed for ask alloc-0 belongs to app app-0", event.Message)

events.SendPreemptionPreConditionsCheckPassed(allocKey, appID, requestResource)
assert.Equal(t, 2, len(eventSystem.Events))
event = eventSystem.Events[1]
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "Preemption pre conditions check passed for ask alloc-0 belongs to app app-0", event.Message)
}

func TestPreemptionQueueGuaranteesCheckEvents(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
events := NewAskEvents(eventSystem)
events.SendPreemptionQueueGuaranteesCheckFailed(allocKey, appID, requestResource)
assert.Equal(t, 0, len(eventSystem.Events))

eventSystem = mock.NewEventSystem()
events = NewAskEvents(eventSystem)
events.SendPreemptionQueueGuaranteesCheckFailed(allocKey, appID, requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "Preemption queue guarantees check failed for ask alloc-0 belongs to app app-0", event.Message)

events.SendPreemptionQueueGuaranteesCheckPassed(allocKey, appID, requestResource)
assert.Equal(t, 2, len(eventSystem.Events))
event = eventSystem.Events[1]
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "Preemption queue guarantees check passed for ask alloc-0 belongs to app app-0", event.Message)
}

func TestPredicateFailedEvents(t *testing.T) {
resource := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
Expand Down
32 changes: 3 additions & 29 deletions pkg/scheduler/objects/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
Expand Down Expand Up @@ -177,12 +178,6 @@ func (p *Preemptor) initWorkingState() {
p.allocationsByNode = allocationsByNode
p.queueByAlloc = queueByAlloc
p.nodeAvailableMap = nodeAvailableMap
log.Log(log.SchedPreemption).Info("Preemption triggered. Working state ",
zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.Int("allocationsByNode", len(p.allocationsByNode)),
zap.Int("queueByAlloc", len(p.queueByAlloc)),
zap.Int("nodeAvailableMap", len(p.nodeAvailableMap)))
}

// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
Expand Down Expand Up @@ -536,11 +531,6 @@ func (p *Preemptor) tryNodes() (string, []*Allocation, bool) {
}
// identify which victims and in which order should be tried
if idx, victims := p.calculateVictimsByNode(nodeAvailable, allocations); victims != nil {
log.Log(log.SchedPreemption).Debug("Node wise Potential victims collected for preemption",
zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.String("node", nodeID),
zap.Int("victims count", len(victims)))
victimsByNode[nodeID] = victims
keys := make([]string, 0)
for _, victim := range victims {
Expand Down Expand Up @@ -568,12 +558,9 @@ func (p *Preemptor) tryNodes() (string, []*Allocation, bool) {
func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
// validate that sufficient capacity can be freed
if !p.checkPreemptionQueueGuarantees() {
p.ask.setPreemptionQueueGuaranteesCheckFailed()
log.Log(log.SchedPreemption).Debug("Preemption doesn't guarantee to free up resources", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath))
p.ask.LogAllocationFailure(common.PreemptionDoesNotGuarantee.Error(), true)
return nil, false
}
p.ask.setPreemptionQueueGuaranteesCheckPassed()

// ensure required data structures are populated
p.initWorkingState()
Expand All @@ -584,21 +571,13 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
// no preemption possible
return nil, false
}
log.Log(log.SchedPreemption).Debug("Node candidate selected for preemption", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.String("node", nodeID),
zap.Int("victims count", len(victims)))

// look for additional victims in case we have not yet made enough capacity in the queue
extraVictims, ok := p.calculateAdditionalVictims(victims)
if !ok {
// not enough resources were preempted
return nil, false
}
log.Log(log.SchedPreemption).Debug("Additional victims chosen for preemption", zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.String("node", nodeID),
zap.Int("additional victims count", len(extraVictims)))
victims = append(victims, extraVictims...)
if len(victims) == 0 {
return nil, false
Expand Down Expand Up @@ -637,12 +616,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {

if p.ask.GetAllocatedResource().StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
// there is shortfall, so preemption doesn't help
log.Log(log.SchedPreemption).Debug("preemption has freed up some resources, but not good enough to run the ask",
zap.String("ask", p.ask.GetAllocationKey()),
zap.String("ask queue path", p.queuePath),
zap.String("node", nodeID),
zap.String("ask resources", p.ask.GetAllocatedResource().String()),
zap.String("freed resources", victimsTotalResource.String()))
p.ask.LogAllocationFailure(common.PreemptionShortfall.Error(), true)
return nil, false
}

Expand Down
Loading

0 comments on commit f996239

Please sign in to comment.