Skip to content

Commit

Permalink
[YUNIKORN-1917] enforce ugm maxapplications rule (#621)
Browse files Browse the repository at this point in the history
Closes: #621

Signed-off-by: Manikandan R <[email protected]>
  • Loading branch information
FrankYang0529 authored and manirajv06 committed Aug 22, 2023
1 parent fa43406 commit ca58806
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)
Expand Down Expand Up @@ -1297,7 +1298,7 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N

// process the apps (filters out app without pending requests)
for _, app := range sq.sortApplications(true, false) {
if app.IsAccepted() && !sq.canRunApp(app.ApplicationID) {
if app.IsAccepted() && (!sq.canRunApp(app.ApplicationID) || !ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)) {
continue
}
alloc := app.tryAllocate(headRoom, preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)
Expand Down
155 changes: 155 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3642,3 +3642,158 @@ func TestReservationTracking(t *testing.T) {
assert.Equal(t, 0, partition.getReservationCount())
assert.Equal(t, "alloc-2", alloc.GetAllocationKey())
}

//nolint:funlen
func TestLimitMaxApplications(t *testing.T) {
testCases := []struct {
name string
limits []configs.Limit
}{
{
name: "specific user",
limits: []configs.Limit{
{
Limit: "specific user limit",
Users: []string{"testuser"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 1,
},
},
},
{
name: "specific group",
limits: []configs.Limit{
{
Limit: "specific group limit",
Groups: []string{"testgroup"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 1,
},
},
},
{
name: "wildcard user",
limits: []configs.Limit{
{
Limit: "wildcard user limit",
Users: []string{"*"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 1,
},
},
},
{
name: "wildcard group",
limits: []configs.Limit{
{
Limit: "specific group limit",
Groups: []string{"nonexistent-group"},
MaxResources: map[string]string{"memory": "500", "vcores": "500"},
MaxApplications: 100,
},
{
Limit: "wildcard group limit",
Groups: []string{"*"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 1,
},
},
},
{
name: "specific user lower than specific group limit",
limits: []configs.Limit{
{
Limit: "specific user limit",
Users: []string{"testuser"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 1,
},
{
Limit: "specific user limit",
Groups: []string{"testgroup"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 100,
},
},
},
{
name: "specific group lower than specific user limit",
limits: []configs.Limit{
{
Limit: "specific user limit",
Users: []string{"testuser"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 100,
},
{
Limit: "specific group limit",
Groups: []string{"testgroup"},
MaxResources: map[string]string{"memory": "5", "vcores": "5"},
MaxApplications: 1,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
setupUGM()
conf := configs.PartitionConfig{
Name: "default",
Queues: []configs.QueueConfig{
{
Name: "root",
Parent: true,
SubmitACL: "*",
Queues: []configs.QueueConfig{
{
Name: "default",
Parent: false,
Limits: tc.limits,
},
},
},
},
NodeSortPolicy: configs.NodeSortingPolicy{},
}

partition, err := newPartitionContext(conf, rmID, nil)
assert.NilError(t, err, "partition create failed")

// add node1
nodeRes, err := resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"})
assert.NilError(t, err, "failed to create basic resource")
err = partition.AddNode(newNodeMaxResource("node-1", nodeRes), nil)
assert.NilError(t, err, "test node1 add failed unexpected")

resMap := map[string]string{"memory": "2", "vcores": "2"}
res, err := resources.NewResourceFromConf(resMap)
assert.NilError(t, err, "Unexpected error when creating resource from map")

// add app1
app1 := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app1)
assert.NilError(t, err, "add application to partition should not have failed")
err = app1.AddAllocationAsk(newAllocationAsk(allocID, appID1, res))
assert.NilError(t, err, "failed to add ask alloc-1 to app-1")

alloc := partition.tryAllocate()
if alloc == nil {
t.Fatal("allocation did not return any allocation")
}
assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated")
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask alloc-1 to be allocated")

// add app2
app2 := newApplication(appID2, "default", defQueue)
err = partition.AddApplication(app2)
assert.NilError(t, err, "add application to partition should not have failed")
err = app2.AddAllocationAsk(newAllocationAsk(allocID2, appID2, res))
assert.NilError(t, err, "failed to add ask alloc-2 to app-1")
assert.Equal(t, app2.CurrentState(), objects.Accepted.String(), "application should have moved to accepted state")

alloc = partition.tryAllocate()
assert.Equal(t, alloc == nil, true, "allocation should not have happened as max apps reached")
})
}
}
6 changes: 6 additions & 0 deletions pkg/scheduler/ugm/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,9 @@ func (gt *GroupTracker) decreaseAllTrackedResourceUsage(queuePath string) map[st
}
return removedApplications
}

func (gt *GroupTracker) canRunApp(queuePath, applicationID string) bool {
gt.Lock()
defer gt.Unlock()
return gt.queueTracker.canRunApp(queuePath, applicationID, group)
}
29 changes: 29 additions & 0 deletions pkg/scheduler/ugm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,35 @@ func (m *Manager) Headroom(queuePath, applicationID string, user security.UserGr
return resources.ComponentWiseMinPermissive(userHeadroom, groupHeadroom)
}

// CanRunApp checks the maxApplications for this specific application that runs as the user and group.
func (m *Manager) CanRunApp(queuePath, applicationID string, user security.UserGroup) bool {
userTracker := m.getUserTracker(user.User)
userCanRunApp := userTracker.canRunApp(queuePath, applicationID)
log.Log(log.SchedUGM).Debug("Check whether user can run app",
zap.String("user", user.User),
zap.String("queue path", queuePath),
zap.Bool("can run app", userCanRunApp))
// make sure the user has a groupTracker for this application, if not yet there add it
if !userTracker.hasGroupForApp(applicationID) {
m.ensureGroupTrackerForApp(queuePath, applicationID, user)
}
// check if this application now has group tracking, if not we're done
appGroup := userTracker.getGroupForApp(applicationID)
if appGroup == common.Empty {
return userCanRunApp
}
groupTracker := m.GetGroupTracker(appGroup)
if groupTracker == nil {
return userCanRunApp
}
groupCanRunApp := groupTracker.canRunApp(queuePath, applicationID)
log.Log(log.SchedUGM).Debug("Check whether group can run app",
zap.String("group", appGroup),
zap.String("queue path", queuePath),
zap.Bool("can run app", groupCanRunApp))
return userCanRunApp && groupCanRunApp
}

// ClearUserTrackers only for tests
func (m *Manager) ClearUserTrackers() {
m.Lock()
Expand Down
128 changes: 128 additions & 0 deletions pkg/scheduler/ugm/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package ugm

import (
"fmt"
"strconv"
"testing"

Expand Down Expand Up @@ -674,6 +675,133 @@ func TestDecreaseTrackedResourceForGroupTracker(t *testing.T) {
assert.Equal(t, resources.Equals(groupTracker.queueTracker.childQueueTrackers["parent"].resourceUsage, resources.Zero), true)
}

//nolint:funlen
func TestCanRunApp(t *testing.T) {
testCases := []struct {
name string
limits []configs.Limit
}{
{
name: "specific user limit",
limits: []configs.Limit{
{
Limit: "specific user limit",
Users: []string{"user1"},
MaxApplications: 1,
},
},
},
{
name: "specific group limit",
limits: []configs.Limit{
{
Limit: "specific group limit",
Groups: []string{"group1"},
MaxApplications: 1,
},
},
},
{
name: "wildcard user limit",
limits: []configs.Limit{
{
Limit: "wildcard user limit",
Users: []string{"*"},
MaxApplications: 1,
},
},
},
{
name: "wildcard group limit",
limits: []configs.Limit{
{
Limit: "specific group limit",
Groups: []string{"nonexistent-group"},
MaxApplications: 100,
},
{
Limit: "wildcard group limit",
Groups: []string{"*"},
MaxApplications: 1,
},
},
},
{
name: "specific user lower than specific group limit",
limits: []configs.Limit{
{
Limit: "specific user limit",
Users: []string{"user1"},
MaxApplications: 1,
},
{
Limit: "specific group limit",
Groups: []string{"group1"},
MaxApplications: 100,
},
},
},
{
name: "specific group lower than specific user limit",
limits: []configs.Limit{
{
Limit: "specific user limit",
Users: []string{"user1"},
MaxApplications: 100,
},
{
Limit: "specific group limit",
Groups: []string{"group1"},
MaxApplications: 1,
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
setupUGM()
// Queue setup:
// root->default
conf := configs.PartitionConfig{
Name: "default",
Queues: []configs.QueueConfig{
{
Name: "root",
Parent: true,
SubmitACL: "*",
Queues: []configs.QueueConfig{
{
Name: "default",
Parent: false,
SubmitACL: "*",
Limits: tc.limits,
},
},
},
},
}
manager := GetUserManager()
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))

user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
usage, err := resources.NewResourceFromConf(map[string]string{"memory": "50"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage)
}

canRunApp := manager.CanRunApp("root.default", TestApp1, user)
assert.Equal(t, canRunApp, true, fmt.Sprintf("user %s should be able to run app %s", user.User, TestApp1))

increased := manager.IncreaseTrackedResource("root.default", TestApp1, usage, user)
assert.Equal(t, increased, true, "unable to increase tracked resource: queuepath root.parent, app "+TestApp1+", res "+usage.String())

canRunApp = manager.CanRunApp("root.default", TestApp2, user)
assert.Equal(t, canRunApp, false, fmt.Sprintf("user %s shouldn't be able to run app %s", user.User, TestApp2))
})
}
}

func createUpdateConfigWithWildCardUsersAndGroups(user string, group string, wildUser string, wildGroup string, memory string, vcores string) configs.PartitionConfig {
conf := configs.PartitionConfig{
Name: "test",
Expand Down
Loading

0 comments on commit ca58806

Please sign in to comment.