Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1675] User with zero quota should not be allowed to submit any shuffle to cluster. #2857

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5302,7 +5302,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.quota.tenant.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written disk bytes.")
.doc("Quota dynamic configuration for written disk bytes. 0 means that no shuffle submissions are allowed.")
.version("0.5.0")
.longConf
.createWithDefault(Long.MaxValue)
Expand All @@ -5311,7 +5311,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.quota.tenant.diskFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written disk file count.")
.doc("Quota dynamic configuration for written disk file count. 0 means that no shuffle submissions are allowed.")
.version("0.5.0")
.longConf
.createWithDefault(Long.MaxValue)
Expand All @@ -5320,7 +5320,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.quota.tenant.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written hdfs bytes.")
.doc("Quota dynamic configuration for written hdfs bytes. 0 means that no shuffle submissions are allowed.")
.version("0.5.0")
.longConf
.createWithDefault(Long.MaxValue)
Expand All @@ -5329,7 +5329,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.quota.tenant.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for written hdfs file count.")
.doc("Quota dynamic configuration for written hdfs file count. 0 means that no shuffle submissions are allowed.")
.version("0.5.0")
.longConf
.createWithDefault(Long.MaxValue)
Expand Down
12 changes: 6 additions & 6 deletions docs/configuration/quota.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ license: |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 | |
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | |
| celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. 0 means that no shuffle submissions are allowed. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. 0 means that no shuffle submissions are allowed. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. 0 means that no shuffle submissions are allowed. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. 0 means that no shuffle submissions are allowed. | 0.5.0 | |
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) ext
userIdentifier: UserIdentifier,
value: Long,
quota: Quota): (Boolean, String) = {
val exceed = (quota.diskBytesWritten > 0 && value >= quota.diskBytesWritten)
val exceed = (quota.diskBytesWritten >= 0 && value >= quota.diskBytesWritten)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that if the value is negative, it indicates that the quota is unlimited, which feels a bit counterintuitive. For numbers less than or equal to 0, it seems better to directly determine them as having no quota.
cc @leixm @AngersZhuuuu

var reason = ""
if (exceed) {
reason = s"User $userIdentifier used diskBytesWritten (${Utils.bytesToString(value)}) " +
Expand All @@ -71,11 +71,11 @@ class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) ext
userIdentifier: UserIdentifier,
value: Long,
quota: Quota): (Boolean, String) = {
val exceed = (quota.diskFileCount > 0 && value >= quota.diskFileCount)
val exceed = (quota.diskFileCount >= 0 && value >= quota.diskFileCount)
var reason = ""
if (exceed) {
reason =
s"User $userIdentifier used diskFileCount($value) exceeds quota(${quota.diskFileCount}). "
s"User $userIdentifier used diskFileCount ($value) exceeds quota (${quota.diskFileCount}). "
logWarning(reason)
}
(exceed, reason)
Expand All @@ -85,11 +85,11 @@ class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) ext
userIdentifier: UserIdentifier,
value: Long,
quota: Quota): (Boolean, String) = {
val exceed = (quota.hdfsBytesWritten > 0 && value >= quota.hdfsBytesWritten)
val exceed = (quota.hdfsBytesWritten >= 0 && value >= quota.hdfsBytesWritten)
var reason = ""
if (exceed) {
reason = s"User $userIdentifier used hdfsBytesWritten(${Utils.bytesToString(value)}) " +
s"exceeds quota(${Utils.bytesToString(quota.hdfsBytesWritten)}). "
reason = s"User $userIdentifier used hdfsBytesWritten (${Utils.bytesToString(value)}) " +
s"exceeds quota (${Utils.bytesToString(quota.hdfsBytesWritten)}). "
logWarning(reason)
}
(exceed, reason)
Expand All @@ -99,11 +99,11 @@ class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) ext
userIdentifier: UserIdentifier,
value: Long,
quota: Quota): (Boolean, String) = {
val exceed = (quota.hdfsFileCount > 0 && value >= quota.hdfsFileCount)
val exceed = (quota.hdfsFileCount >= 0 && value >= quota.hdfsFileCount)
var reason = ""
if (exceed) {
reason =
s"User $userIdentifier used hdfsFileCount($value) exceeds quota(${quota.hdfsFileCount}). "
s"User $userIdentifier used hdfsFileCount ($value) exceeds quota (${quota.hdfsFileCount}). "
logWarning(reason)
}
(exceed, reason)
Expand Down
5 changes: 5 additions & 0 deletions master/src/test/resources/dynamicConfig-quota.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@
celeborn.quota.tenant.diskBytesWritten: 100G
celeborn.quota.tenant.diskFileCount: 10000

- name: Tom
config:
celeborn.quota.tenant.diskBytesWritten: 0G
celeborn.quota.tenant.diskFileCount: 0


Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,29 @@ class QuotaManagerSuite extends CelebornFunSuite {
val exp1 = (true, "")
val exp2 = (
false,
s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). ")
s"User $user used hdfsBytesWritten (30.0 GiB) exceeds quota (10.0 GiB). ")
val exp3 = (
false,
s"User $user used diskBytesWritten (200.0 GiB) exceeds quota (100.0 GiB). " +
s"User $user used diskFileCount(20000) exceeds quota(10000). " +
s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). ")
s"User $user used diskFileCount (20000) exceeds quota (10000). " +
s"User $user used hdfsBytesWritten (30.0 GiB) exceeds quota (10.0 GiB). ")

assert(res1 == exp1)
assert(res2 == exp2)
assert(res3 == exp3)
}

test("User with zero quota should not be allowed to submit any shuffle to cluster") {
val user = UserIdentifier("tenant_01", "Tom")
val rc1 =
ResourceConsumption(Utils.byteStringAsBytes("0"), 0, Utils.byteStringAsBytes("0"), 0)

val res1 = quotaManager.checkQuotaSpaceAvailable(user, rc1)

val exp1 = (
false,
s"User $user used diskBytesWritten (0.0 B) exceeds quota (0.0 B). ")

assert(res1 == exp1)
}
}
Loading