Skip to content

Commit

Permalink
[YUNIKORN-1858] Handle group limit config changes (#600)
Browse files Browse the repository at this point in the history
handle config changes for groups and clean up linked usage tracking
objects when the config changes.

Closes: #600

Signed-off-by: Wilfred Spiegelenburg <[email protected]>
  • Loading branch information
manirajv06 authored and wilfred-s committed Aug 21, 2023
1 parent 80c3788 commit fa43406
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 70 deletions.
1 change: 1 addition & 0 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {

log.Log(log.SchedApplication).Info("ask added successfully to application",
zap.String("appID", sa.ApplicationID),
zap.String("appID", sa.user.User),
zap.String("ask", ask.GetAllocationKey()),
zap.Bool("placeholder", ask.IsPlaceholder()),
zap.Stringer("pendingDelta", delta))
Expand Down
20 changes: 20 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3527,6 +3527,26 @@ func TestUserHeadroom(t *testing.T) {
if alloc != nil {
t.Fatal("allocation should not happen on other nodes as well")
}
partition.removeApplication("app-5")

app6 := newApplicationWithUser("app-6", "default", "root.parent.sub-leaf", security.UserGroup{
User: "testuser1",
Groups: []string{"testgroup1"},
})
res, err = resources.NewResourceFromConf(map[string]string{"memory": "3", "vcores": "3"})
assert.NilError(t, err, "failed to create resource")

err = partition.AddApplication(app6)
assert.NilError(t, err, "failed to add app-6 to partition")
err = app6.AddAllocationAsk(newAllocationAsk(allocID, "app-6", res))
assert.NilError(t, err, "failed to add ask alloc-1 to app-6")

// app 6 would be allocated as headroom is nil because no limits configured for 'testuser1' user an
alloc = partition.tryAllocate()
if alloc == nil {
t.Fatal("allocation did not return any allocation")
}
assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated")
}

func TestPlaceholderAllocationTracking(t *testing.T) {
Expand Down
36 changes: 23 additions & 13 deletions pkg/scheduler/ugm/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
)

type GroupTracker struct {
groupName string // Name of the group for which usage is being tracked upon
applications map[string]bool // Hold applications currently run by all users belong to this group
queueTracker *QueueTracker // Holds the actual resource usage of queue path where application run
groupName string // Name of the group for which usage is being tracked upon
applications map[string]string // Hold applications currently run by all users belong to this group
queueTracker *QueueTracker // Holds the actual resource usage of queue path where application run

sync.RWMutex
}
Expand All @@ -38,19 +38,19 @@ func newGroupTracker(group string) *GroupTracker {
queueTracker := newRootQueueTracker()
groupTracker := &GroupTracker{
groupName: group,
applications: make(map[string]bool),
applications: make(map[string]string),
queueTracker: queueTracker,
}
return groupTracker
}

func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID string, usage *resources.Resource) bool {
func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID string, usage *resources.Resource, user string) bool {
if gt == nil {
return true
}
gt.Lock()
defer gt.Unlock()
gt.applications[applicationID] = true
gt.applications[applicationID] = user
return gt.queueTracker.increaseTrackedResource(queuePath, applicationID, group, usage)
}

Expand All @@ -66,7 +66,7 @@ func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID string,
return gt.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, removeApp)
}

func (gt *GroupTracker) getTrackedApplications() map[string]bool {
func (gt *GroupTracker) getTrackedApplications() map[string]string {
gt.RLock()
defer gt.RUnlock()
return gt.applications
Expand Down Expand Up @@ -123,15 +123,25 @@ func (gt *GroupTracker) canBeRemoved() bool {
return len(gt.queueTracker.childQueueTrackers) == 0 && len(gt.queueTracker.runningApplications) == 0
}

func (gt *GroupTracker) removeApp(applicationID string) {
gt.Lock()
defer gt.Unlock()
delete(gt.applications, applicationID)
}

func (gt *GroupTracker) getName() string {
if gt == nil {
return common.Empty
}
return gt.groupName
}

func (gt *GroupTracker) decreaseAllTrackedResourceUsage(queuePath string) map[string]string {
if gt == nil {
return nil
}
gt.Lock()
defer gt.Unlock()
applications := gt.queueTracker.decreaseTrackedResourceUsageDownwards(queuePath)
removedApplications := make(map[string]string)
for app := range applications {
if u, ok := gt.applications[app]; ok {
removedApplications[app] = u
}
}
return removedApplications
}
18 changes: 9 additions & 9 deletions pkg/scheduler/ugm/group_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
}
result := groupTracker.increaseTrackedResource(queuePath1, TestApp1, usage1)
result := groupTracker.increaseTrackedResource(queuePath1, TestApp1, usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, usage1)
}
Expand All @@ -49,7 +49,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2)
}
result = groupTracker.increaseTrackedResource(queuePath2, TestApp2, usage2)
result = groupTracker.increaseTrackedResource(queuePath2, TestApp2, usage2, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath2, TestApp2, usage2)
}
Expand All @@ -58,7 +58,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage3)
}
result = groupTracker.increaseTrackedResource(queuePath3, TestApp3, usage3)
result = groupTracker.increaseTrackedResource(queuePath3, TestApp3, usage3, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath3, TestApp3, usage3)
}
Expand All @@ -67,7 +67,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage3)
}
result = groupTracker.increaseTrackedResource(queuePath4, TestApp4, usage4)
result = groupTracker.increaseTrackedResource(queuePath4, TestApp4, usage4, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath4, TestApp4, usage4)
}
Expand All @@ -92,7 +92,7 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
}
result := groupTracker.increaseTrackedResource(queuePath1, TestApp1, usage1)
result := groupTracker.increaseTrackedResource(queuePath1, TestApp1, usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, usage1)
}
Expand All @@ -102,7 +102,7 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2)
}
result = groupTracker.increaseTrackedResource(queuePath2, TestApp2, usage2)
result = groupTracker.increaseTrackedResource(queuePath2, TestApp2, usage2, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath2, TestApp2, usage2)
}
Expand Down Expand Up @@ -172,19 +172,19 @@ func TestGTSetMaxLimits(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
}

result := groupTracker.increaseTrackedResource(queuePath1, TestApp1, usage1)
result := groupTracker.increaseTrackedResource(queuePath1, TestApp1, usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, usage1)
}

groupTracker.setLimits(queuePath1, resources.Multiply(usage1, 5), 5)
groupTracker.setLimits("root.parent", resources.Multiply(usage1, 10), 10)

result = groupTracker.increaseTrackedResource(queuePath1, TestApp2, usage1)
result = groupTracker.increaseTrackedResource(queuePath1, TestApp2, usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp2, usage1)
}
result = groupTracker.increaseTrackedResource(queuePath1, TestApp3, usage1)
result = groupTracker.increaseTrackedResource(queuePath1, TestApp3, usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp2, usage1)
}
Expand Down
51 changes: 14 additions & 37 deletions pkg/scheduler/ugm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,46 +408,20 @@ func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig, que

// clearEarlierSetLimits Clear already configured limits of users and groups for which limits have been configured before but not now
func (m *Manager) clearEarlierSetLimits(userLimits map[string]bool, groupLimits map[string]bool, queuePath string) error {
// Clear already configured limits of user for which limits have been configured before but not now
for u, ut := range m.userTrackers {
// Is this user already tracked for the queue path?
if ut.IsQueuePathTrackedCompletely(queuePath) {
// Traverse all the group trackers linked to user through different applications and remove the linkage
for appID, gt := range ut.appGroupTrackers {
if gt != nil {
g := gt.groupName
// Is there any limit config set for group in the current configuration? If not, then remove the linkage by setting it to nil
if ok := groupLimits[g]; !ok {
log.Log(log.SchedUGM).Debug("Removed the linkage between user and group through applications",
zap.String("user", u),
zap.String("group", gt.groupName),
zap.String("application id", appID),
zap.String("queue path", queuePath))
// removing the linkage only happens here by setting it to nil and deleting app id
// but group resource usage so far remains as it is because we don't have app id wise resource usage with in group as of now.
// YUNIKORN-1858 handles the group resource usage properly
// In case of only one (last) application, group tracker would be removed from the manager.
ut.setGroupForApp(appID, nil)
gt.removeApp(appID)
if len(gt.getTrackedApplications()) == 0 {
log.Log(log.SchedUGM).Debug("Is this app the only running application in group?",
zap.String("user", u),
zap.String("group", gt.groupName),
zap.Int("no. of applications", len(gt.getTrackedApplications())),
zap.String("application id", appID),
zap.String("queue path", queuePath))
delete(m.groupTrackers, g)
}
}
}
// Clear already configured limits of group for which limits have been configured before but not now
for _, gt := range m.groupTrackers {
appUsersMap := m.clearEarlierSetGroupLimits(gt, queuePath, groupLimits)
if len(appUsersMap) > 0 {
for app, user := range appUsersMap {
ut := m.userTrackers[user]
ut.setGroupForApp(app, nil)
}
}
m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
}

// Clear already configured limits of group for which limits have been configured before but not now
for _, gt := range m.groupTrackers {
m.clearEarlierSetGroupLimits(gt, queuePath, groupLimits)
// Clear already configured limits of user for which limits have been configured before but not now
for _, ut := range m.userTrackers {
m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
}
return nil
}
Expand Down Expand Up @@ -480,7 +454,8 @@ func (m *Manager) clearEarlierSetUserLimits(ut *UserTracker, queuePath string, u
}
}

func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath string, groupLimits map[string]bool) {
func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath string, groupLimits map[string]bool) map[string]string {
appUsersMap := make(map[string]string)
// Is this group already tracked for the queue path?
if gt.IsQueuePathTrackedCompletely(queuePath) {
g := gt.groupName
Expand All @@ -489,6 +464,7 @@ func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath string,
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for group",
zap.String("group", g),
zap.String("queue path", queuePath))
appUsersMap = gt.decreaseAllTrackedResourceUsage(queuePath)
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if gt.IsUnlinkRequired(queuePath) {
gt.UnlinkQT(queuePath)
Expand All @@ -506,6 +482,7 @@ func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath string,
}
}
}
return appUsersMap
}

func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig, queuePath string) error {
Expand Down
56 changes: 46 additions & 10 deletions pkg/scheduler/ugm/queue_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID
zap.String("queue path", queuePath),
zap.Bool("existing app", existingApp),
zap.Uint64("max running apps", qt.maxRunningApps),
zap.String("max resources", qt.maxResources.String()))
zap.Stringer("max resources", qt.maxResources))
if (!existingApp && len(qt.runningApplications)+1 > int(qt.maxRunningApps)) ||
resources.StrictlyGreaterThan(finalResourceUsage, qt.maxResources) {
log.Log(log.SchedUGM).Warn("Unable to increase resource usage as allowing new application to run would exceed either configured max applications or max resources limit of specific user/group",
Expand All @@ -97,8 +97,8 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID
zap.Bool("existing app", existingApp),
zap.Int("current running applications", len(qt.runningApplications)),
zap.Uint64("max running applications", qt.maxRunningApps),
zap.String("current resource usage", qt.resourceUsage.String()),
zap.String("max resource usage", qt.maxResources.String()))
zap.Stringer("current resource usage", qt.resourceUsage),
zap.Stringer("max resource usage", qt.maxResources))
return false
}
}
Expand All @@ -118,7 +118,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID
zap.String("queue path", queuePath),
zap.Bool("existing app", existingApp),
zap.Uint64("wild card max running apps", config.maxApplications),
zap.String("wild card max resources", config.maxResources.String()),
zap.Stringer("wild card max resources", config.maxResources),
zap.Bool("wild card quota exceeded", wildCardQuotaExceeded))
wildCardQuotaExceeded = (config.maxApplications != 0 && !existingApp && len(qt.runningApplications)+1 > int(config.maxApplications)) ||
(!resources.Equals(resources.NewResource(), config.maxResources) && resources.StrictlyGreaterThan(finalResourceUsage, config.maxResources))
Expand All @@ -129,8 +129,8 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID
zap.Bool("existing app", existingApp),
zap.Int("current running applications", len(qt.runningApplications)),
zap.Uint64("max running applications", config.maxApplications),
zap.String("current resource usage", qt.resourceUsage.String()),
zap.String("max resource usage", config.maxResources.String()))
zap.Stringer("current resource usage", qt.resourceUsage),
zap.Stringer("max resource usage", config.maxResources))
return false
}
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID
zap.Bool("existing app", existingApp),
zap.Stringer("resource", usage),
zap.Uint64("max running applications", qt.maxRunningApps),
zap.String("max resource usage", qt.maxResources.String()),
zap.Stringer("max resource usage", qt.maxResources),
zap.Stringer("total resource after increasing", qt.resourceUsage),
zap.Int("total applications after increasing", len(qt.runningApplications)))
return true
Expand Down Expand Up @@ -239,7 +239,7 @@ func (qt *QueueTracker) setLimit(queuePath string, maxResource *resources.Resour
log.Log(log.SchedUGM).Debug("Setting limits",
zap.String("queue path", queuePath),
zap.Uint64("max applications", maxApps),
zap.String("max resources", maxResource.String()))
zap.Stringer("max resources", maxResource))
childQueueTracker := qt.getChildQueueTracker(queuePath)
childQueueTracker.maxRunningApps = maxApps
childQueueTracker.maxResources = maxResource
Expand Down Expand Up @@ -268,8 +268,8 @@ func (qt *QueueTracker) headroom(queuePath string) *resources.Resource {
log.Log(log.SchedUGM).Debug("Calculated headroom",
zap.String("queue path", queuePath),
zap.String("queue", qt.queueName),
zap.String("max resource", qt.maxResources.String()),
zap.String("headroom", headroom.String()))
zap.Stringer("max resource", qt.maxResources),
zap.Stringer("headroom", headroom))
return headroom
}
return nil
Expand Down Expand Up @@ -364,3 +364,39 @@ func (qt *QueueTracker) UnlinkQT(queuePath string) bool {
}
return false
}

// decreaseTrackedResourceUsageDownwards queuePath either could be parent or leaf queue path. If it is parent queue path, then traverse upto the end leaf
// recursively for all child queues, reset resourceUsage and runningApplications to the default value.
// Once downward traversal has been completed, traverse downwards using decreaseTrackedResourceUsageUpwards
func (qt *QueueTracker) decreaseTrackedResourceUsageDownwards(queuePath string) map[string]bool {
childQueueTracker := qt.getChildQueueTracker(queuePath)
childQueueTrackers := childQueueTracker.childQueueTrackers
removedApplications := make(map[string]bool)
for _, childQT := range childQueueTrackers {
if len(childQT.runningApplications) > 0 && childQT.resourceUsage != resources.NewResource() {
removedApplications = childQT.runningApplications
childQT.resourceUsage = resources.NewResource()
childQT.runningApplications = make(map[string]bool)
childQT.decreaseTrackedResourceUsageDownwards(childQT.queuePath)
}
}
qt.decreaseTrackedResourceUsageUpwards(queuePath)
return removedApplications
}

// decreaseTrackedResourceUsageUpwards Traverse upwards all the way upto the root starting from last queue in queuePath,
// reset resourceUsage and runningApplications to the default value.
func (qt *QueueTracker) decreaseTrackedResourceUsageUpwards(queuePath string) {
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
if childQueuePath != common.Empty {
if qt.childQueueTrackers[immediateChildQueueName] == nil {
log.Log(log.SchedUGM).Error("Child queueTracker tracker must be available in child queues map",
zap.String("child queueTracker name", immediateChildQueueName))
}
qt.childQueueTrackers[immediateChildQueueName].decreaseTrackedResourceUsageUpwards(childQueuePath)
}
if len(qt.runningApplications) > 0 && qt.resourceUsage != resources.NewResource() {
qt.resourceUsage = resources.NewResource()
qt.runningApplications = make(map[string]bool)
}
}
2 changes: 1 addition & 1 deletion pkg/scheduler/ugm/user_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (ut *UserTracker) increaseTrackedResource(queuePath, applicationID string,
zap.String("queue path", queuePath),
zap.String("application", applicationID),
zap.Stringer("resource", usage))
increasedGroupUsage := gt.increaseTrackedResource(queuePath, applicationID, usage)
increasedGroupUsage := gt.increaseTrackedResource(queuePath, applicationID, usage, ut.userName)
if !increasedGroupUsage {
_, decreased := ut.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, false)
if !decreased {
Expand Down
Loading

0 comments on commit fa43406

Please sign in to comment.