Skip to content

Commit

Permalink
[SPARK-49804][K8S] Fix to use the exit code of executor container always
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

When deploying Spark pods on Kubernetes with sidecars, the reported executor's exit code may be incorrect.
For example, the reported executor's exit code is 0(success), but the actual is 52 (OOM).
```
2024-09-25 02:35:29,383 ERROR TaskSchedulerImpl.logExecutorLoss - Lost executor 1 on XXXXX: The executor with
 id 1 exited with exit code 0(success).

The API gave the following container statuses:

     container name: fluentd
     container image: docker-images-release.XXXXX.com/XXXXX/fluentd:XXXXX
     container state: terminated
     container started at: 2024-09-25T02:32:17Z
     container finished at: 2024-09-25T02:34:52Z
     exit code: 0
     termination reason: Completed

     container name: istio-proxy
     container image: docker-images-release.XXXXX.com/XXXXX-istio/proxyv2:XXXXX
     container state: running
     container started at: 2024-09-25T02:32:16Z

     container name: spark-kubernetes-executor
     container image: docker-dev-artifactory.XXXXX.com/XXXXX/spark-XXXXX:XXXXX
     container state: terminated
     container started at: 2024-09-25T02:32:17Z
     container finished at: 2024-09-25T02:35:28Z
     exit code: 52
     termination reason: Error
```
The `ExecutorPodsLifecycleManager.findExitCode()` looks for any terminated container and may choose the sidecar instead of the main executor container. I'm changing it to look for the executor container always.
Note, it may happen that the pod fails because of the failure of the sidecar container while executor's container is still running, with my changes the reported exit code will be -1 (`UNKNOWN_EXIT_CODE`).

### Why are the changes needed?

To correctly report executor failure reason on UI, in the logs and for the event listeners `SparkListener.onExecutorRemoved()`

### Does this PR introduce _any_ user-facing change?

Yes, the executor's exit code is taken from the main container instead of the sidecar.

### How was this patch tested?

Added unit test and tested manually on the Kubernetes cluster by simulating different types of executor failure (JVM OOM and container eviction due to disk pressure on the node).

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48275 from fe2s/SPARK-49804-fix-exit-code.

Lead-authored-by: oleksii.diagiliev <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
oleksii.diagiliev and dongjoon-hyun committed Sep 27, 2024
1 parent 27d4a77 commit 5d701f2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ private[spark] class ExecutorPodsLifecycleManager(

private val namespace = conf.get(KUBERNETES_NAMESPACE)

private val sparkContainerName = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)
.getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)

def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) {
Expand Down Expand Up @@ -246,7 +249,8 @@ private[spark] class ExecutorPodsLifecycleManager(

private def findExitCode(podState: FinalPodState): Int = {
podState.pod.getStatus.getContainerStatuses.asScala.find { containerStatus =>
containerStatus.getState.getTerminated != null
containerStatus.getName == sparkContainerName &&
containerStatus.getState.getTerminated != null
}.map { terminatedContainer =>
terminatedContainer.getState.getTerminated.getExitCode.toInt
}.getOrElse(UNKNOWN_EXIT_CODE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
object ExecutorLifecycleTestUtils {

val TEST_SPARK_APP_ID = "spark-app-id"
val TEST_SPARK_EXECUTOR_CONTAINER_NAME = "spark-executor"

def failedExecutorWithoutDeletion(
executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
Expand All @@ -37,7 +38,7 @@ object ExecutorLifecycleTestUtils {
.withPhase("failed")
.withStartTime(Instant.now.toString)
.addNewContainerStatus()
.withName("spark-executor")
.withName(TEST_SPARK_EXECUTOR_CONTAINER_NAME)
.withImage("k8s-spark")
.withNewState()
.withNewTerminated()
Expand All @@ -49,6 +50,38 @@ object ExecutorLifecycleTestUtils {
.addNewContainerStatus()
.withName("spark-executor-sidecar")
.withImage("k8s-spark-sidecar")
.withNewState()
.withNewTerminated()
.withMessage("Failed")
.withExitCode(2)
.endTerminated()
.endState()
.endContainerStatus()
.withMessage("Executor failed.")
.withReason("Executor failed because of a thrown error.")
.endStatus()
.build()
}

def failedExecutorWithSidecarStatusListedFirst(
executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus()
.withPhase("failed")
.withStartTime(Instant.now.toString)
.addNewContainerStatus() // sidecar status listed before executor's container status
.withName("spark-executor-sidecar")
.withImage("k8s-spark-sidecar")
.withNewState()
.withNewTerminated()
.withMessage("Failed")
.withExitCode(2)
.endTerminated()
.endState()
.endContainerStatus()
.addNewContainerStatus()
.withName(TEST_SPARK_EXECUTOR_CONTAINER_NAME)
.withImage("k8s-spark")
.withNewState()
.withNewTerminated()
.withMessage("Failed")
Expand Down Expand Up @@ -200,7 +233,7 @@ object ExecutorLifecycleTestUtils {
.endSpec()
.build()
val container = new ContainerBuilder()
.withName("spark-executor")
.withName(TEST_SPARK_EXECUTOR_CONTAINER_NAME)
.withImage("k8s-spark")
.build()
SparkPod(pod, container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.KubernetesUtils._
Expand Down Expand Up @@ -60,14 +61,16 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte

before {
MockitoAnnotations.openMocks(this).close()
val sparkConf = new SparkConf()
.set(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME, TEST_SPARK_EXECUTOR_CONTAINER_NAME)
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
namedExecutorPods = mutable.Map.empty[String, PodResource]
when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map.empty[String, Long])
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.inNamespace(anyString())).thenReturn(podsWithNamespace)
when(podsWithNamespace.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
new SparkConf(),
sparkConf,
kubernetesClient,
snapshotsStore)
eventHandlerUnderTest.start(schedulerBackend)
Expand Down Expand Up @@ -162,6 +165,15 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
.edit(any[UnaryOperator[Pod]]())
}

test("SPARK-49804: Use the exit code of executor container always") {
val failedPod = failedExecutorWithSidecarStatusListedFirst(1)
snapshotsStore.updatePod(failedPod)
snapshotsStore.notifySubscribers()
val msg = exitReasonMessage(1, failedPod, 1)
val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
}

private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): String = {
val reason = Option(failedPod.getStatus.getReason)
val message = Option(failedPod.getStatus.getMessage)
Expand Down

0 comments on commit 5d701f2

Please sign in to comment.