Skip to content

Commit

Permalink
[SPARK-50247][CORE] Define BLOCK_MANAGER_REREGISTRATION_FAILED as `…
Browse files Browse the repository at this point in the history
…ExecutorExitCode`

### What changes were proposed in this pull request?

This PR aims to define a new error code, `BLOCK_MANAGER_REREGISTRATION_FAILED` as `ExecutorExitCode` officially from Apache Spark 4.0 like the existing `HEARTBEAT_FAILURE`.

https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala#L46

### Why are the changes needed?

Until Spark 3, Spark executor fails with `-1` like the following without providing a way to handle this specific error specifically.

https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L673-L674

### Does this PR introduce _any_ user-facing change?

To handle this executor failure reason properly.

### How was this patch tested?

Pass with the newly added test cases.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48776 from dongjoon-hyun/SPARK-50247.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
dongjoon-hyun authored and yaooqinn committed Nov 7, 2024
1 parent f2d39b9 commit a439136
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ object ExecutorExitCode {
* TaskReaper. */
val KILLED_BY_TASK_REAPER = 57

/** Executor is unable to re-register BlockManager. */
val BLOCK_MANAGER_REREGISTRATION_FAILED = 58

def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
Expand All @@ -63,6 +66,8 @@ object ExecutorExitCode {
"ExternalBlockStore failed to create a local temporary directory."
case HEARTBEAT_FAILURE =>
"Unable to send heartbeats to driver."
case BLOCK_MANAGER_REREGISTRATION_FAILED =>
"Executor killed due to a failure of block manager re-registration."
case KILLED_BY_TASK_REAPER =>
"Executor killed by TaskReaper."
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ private[spark] class StandaloneSchedulerBackend(
val reason: ExecutorLossReason = exitStatus match {
case Some(ExecutorExitCode.HEARTBEAT_FAILURE) =>
ExecutorExited(ExecutorExitCode.HEARTBEAT_FAILURE, exitCausedByApp = false, message)
case Some(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED) =>
ExecutorExited(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED,
exitCausedByApp = false, message)
case Some(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) =>
ExecutorExited(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR,
exitCausedByApp = false, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.commons.io.IOUtils

import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.{DataReadMethod, ExecutorExitCode}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{Network, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests}
Expand Down Expand Up @@ -671,7 +671,7 @@ private[spark] class BlockManager(
reportAllBlocks()
} else {
logError("Exiting executor due to block manager re-registration failure")
System.exit(-1)
System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
}
}

Expand Down
37 changes: 37 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,43 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc = new SparkContext(conf)
sc.stop()
}

test("SPARK-50247: BLOCK_MANAGER_REREGISTRATION_FAILED should be counted as network failure") {
// This test case follows the test structure of HEARTBEAT_FAILURE error code (SPARK-39957)
val conf = new SparkConf().set(TASK_MAX_FAILURES, 1)
val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code", conf)
val result = sc.parallelize(1 to 10, 1).map { x =>
val context = org.apache.spark.TaskContext.get()
if (context.taskAttemptId() == 0) {
System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
} else {
x
}
}.count()
assert(result == 10L)
sc.stop()
}

test("SPARK-50247: BLOCK_MANAGER_REREGISTRATION_FAILED will be counted as task failure when " +
"EXECUTOR_REMOVE_DELAY is disabled") {
// This test case follows the test structure of HEARTBEAT_FAILURE error code (SPARK-39957)
val conf = new SparkConf().set(TASK_MAX_FAILURES, 1).set(EXECUTOR_REMOVE_DELAY.key, "0s")
val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code", conf)
eventually(timeout(30.seconds), interval(1.seconds)) {
val e = intercept[SparkException] {
sc.parallelize(1 to 10, 1).map { x =>
val context = org.apache.spark.TaskContext.get()
if (context.taskAttemptId() == 0) {
System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
} else {
x
}
}.count()
}
assert(e.getMessage.contains("Remote RPC client disassociated"))
}
sc.stop()
}
}

object SparkContextSuite {
Expand Down

0 comments on commit a439136

Please sign in to comment.