Skip to content

Commit

Permalink
[CELEBORN-1719] Introduce celeborn.client.spark.stageRerun.enabled wi…
Browse files Browse the repository at this point in the history
…th alternative celeborn.client.spark.fetch.throwsFetchFailure to enable spark stage rerun

### What changes were proposed in this pull request?

1. Introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
2. Change the default value of `celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which enables spark stage rerun at default.

### Why are the changes needed?

User could not directly understand the meaning of `celeborn.client.spark.fetch.throwsFetchFailure` as whether to enable stage rerun, which means that client throws `FetchFailedException` instead of `CelebornIOException`. It's recommended to introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #2920 from SteNicholas/CELEBORN-1719.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
SteNicholas committed Nov 20, 2024
1 parent 6fc50e3 commit 1d0032b
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private void initializeLifecycleManager(String appId) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientFetchThrowsFetchFailure()) {
if (celebornConf.clientStageRerunEnabled()) {
MapOutputTrackerMaster mapOutputTracker =
(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
lifecycleManager.registerShuffleTrackerCallback(
Expand Down Expand Up @@ -135,7 +135,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
lifecycleManager.getPort(),
lifecycleManager.getUserIdentifier(),
shuffleId,
celebornConf.clientFetchThrowsFetchFailure(),
celebornConf.clientStageRerunEnabled(),
numMaps,
dependency);
}
Expand All @@ -148,8 +148,7 @@ public boolean unregisterShuffle(int appShuffleId) {
}
// For Spark driver side trigger unregister shuffle.
if (lifecycleManager != null) {
lifecycleManager.unregisterAppShuffle(
appShuffleId, celebornConf.clientFetchThrowsFetchFailure());
lifecycleManager.unregisterAppShuffle(appShuffleId, celebornConf.clientStageRerunEnabled());
}
// For Spark executor side cleanup shuffle related info.
if (shuffleClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void initializeLifecycleManager(String appId) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle);
if (celebornConf.clientFetchThrowsFetchFailure()) {
if (celebornConf.clientStageRerunEnabled()) {
MapOutputTrackerMaster mapOutputTracker =
(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();

Expand Down Expand Up @@ -187,7 +187,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
lifecycleManager.getPort(),
lifecycleManager.getUserIdentifier(),
shuffleId,
celebornConf.clientFetchThrowsFetchFailure(),
celebornConf.clientStageRerunEnabled(),
dependency.rdd().getNumPartitions(),
dependency);
}
Expand All @@ -200,8 +200,7 @@ public boolean unregisterShuffle(int appShuffleId) {
}
// For Spark driver side trigger unregister shuffle.
if (lifecycleManager != null) {
lifecycleManager.unregisterAppShuffle(
appShuffleId, celebornConf.clientFetchThrowsFetchFailure());
lifecycleManager.unregisterAppShuffle(appShuffleId, celebornConf.clientStageRerunEnabled());
}
// For Spark executor side cleanup shuffle related info.
if (shuffleClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def clientFetchMaxRetriesForEachReplica: Int = get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
def clientFetchThrowsFetchFailure: Boolean = get(CLIENT_FETCH_THROWS_FETCH_FAILURE)
def clientStageRerunEnabled: Boolean = get(CLIENT_STAGE_RERUN_ENABLED)
def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
def clientFetchExcludedWorkerExpireTimeout: Long =
Expand Down Expand Up @@ -4586,13 +4586,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(3)

val CLIENT_FETCH_THROWS_FETCH_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.fetch.throwsFetchFailure")
val CLIENT_STAGE_RERUN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.stageRerun.enabled")
.withAlternative("celeborn.client.spark.fetch.throwsFetchFailure")
.categories("client")
.version("0.4.0")
.doc("client throws FetchFailedException instead of CelebornIOException")
.doc("Whether to enable stage rerun. If true, client throws FetchFailedException instead of CelebornIOException.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ license: |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | |
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.client.spark.fetch.throwsFetchFailure | false | false | client throws FetchFailedException instead of CelebornIOException | 0.4.0 | |
| celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count | 0.5.0 | |
| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | false | Threshold of shuffle partition number for dynamically switching push writer mode. When the shuffle partition number is greater than this value, use the sort-based shuffle writer for memory efficiency; otherwise use the hash-based shuffle writer for speed. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | |
| celeborn.client.spark.push.sort.memory.maxMemoryFactor | 0.4 | false | the max portion of executor memory which can be used for SortBasedWriter buffer (only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is enabled | 0.5.0 | |
Expand All @@ -120,6 +119,7 @@ license: |
| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use spark built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use spark built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota, shuffle partition number; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.5.0 | |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always use spark built-in shuffle implementation. This configuration is deprecated, consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer |
| celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable stage rerun. If true, client throws FetchFailedException instead of CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure |
| celeborn.client.tagsExpr | | false | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, `prod,high-io` filters workers that have both the `prod` and `high-io` tags. | 0.6.0 | |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided by celeborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take static master endpoints as input. Allowed pattern: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver. | 0.2.0 | |
| celeborn.master.endpoints.resolver | org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath. | 0.6.0 | |
Expand Down
2 changes: 2 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ license: |
# Migration Guide

# Upgrading from 0.5 to 0.6

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.high.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.high.watermark` instead.

- Since 0.6.0, Celeborn changed the default value of `celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which means Celeborn will enable spark stage rerun at default.

- Since 0.6.0, Celeborn has introduced a new RESTful API namespace: /api/v1, which uses the application/json media type for requests and responses.
The `celeborn-openapi-client` SDK is also available to help users interact with the new RESTful APIs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
Expand Down Expand Up @@ -145,7 +145,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "false")
.config("spark.celeborn.client.spark.stageRerun.enabled", "false")
.getOrCreate()

val value = Range(1, 10000).mkString(",")
Expand Down Expand Up @@ -177,7 +177,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
Expand Down Expand Up @@ -208,7 +208,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
Expand Down Expand Up @@ -248,7 +248,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config(
"spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager")
Expand Down Expand Up @@ -279,7 +279,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.getOrCreate()

sparkSession.sql("create table if not exists t_1 (a bigint) using parquet")
Expand All @@ -300,7 +300,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config("spark.celeborn.client.push.buffer.max.size", 0)
.config(
"spark.shuffle.manager",
Expand Down Expand Up @@ -344,7 +344,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config("spark.celeborn.client.push.buffer.max.size", 0)
.config(
"spark.shuffle.manager",
Expand Down Expand Up @@ -390,7 +390,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.config("spark.sql.shuffle.partitions", 2)
.config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
.config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
.config("spark.celeborn.client.spark.stageRerun.enabled", "true")
.config("spark.celeborn.client.push.buffer.max.size", 0)
.config("spark.stage.maxConsecutiveAttempts", "1")
.config("spark.stage.maxAttempts", "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CelebornShuffleLostSuite extends AnyFunSuite
sparkSession.stop()

val conf = updateSparkConf(sparkConf, ShuffleMode.HASH)
conf.set("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
conf.set("spark.celeborn.client.spark.stageRerun.enabled", "true")
conf.set("spark.celeborn.test.client.mockShuffleLost", "true")

val celebornSparkSession = SparkSession.builder()
Expand Down

0 comments on commit 1d0032b

Please sign in to comment.