diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 204ffc39a1108..f06312c15cf39 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager( // Should be 0 when no stages are active. private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int] private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] - // Number of speculative tasks pending/running in each stageAttempt - private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int] - // The speculative tasks started in each stageAttempt + // Map from each stageAttempt to a set of running speculative task indexes + // TODO(SPARK-41192): We simply need an Int for this. private val stageAttemptToSpeculativeTaskIndices = + new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]() + // Map from each stageAttempt to a set of pending speculative task indexes + private val stageAttemptToPendingSpeculativeTasks = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] private val resourceProfileIdToStageAttempt = @@ -722,7 +724,7 @@ private[spark] class ExecutorAllocationManager( // because the attempt may still have running tasks, // even after another attempt for the stage is submitted. stageAttemptToNumTasks -= stageAttempt - stageAttemptToNumSpeculativeTasks -= stageAttempt + stageAttemptToPendingSpeculativeTasks -= stageAttempt stageAttemptToTaskIndices -= stageAttempt stageAttemptToSpeculativeTaskIndices -= stageAttempt stageAttemptToExecutorPlacementHints -= stageAttempt @@ -733,7 +735,9 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason - if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) { + if (stageAttemptToNumTasks.isEmpty + && stageAttemptToPendingSpeculativeTasks.isEmpty + && stageAttemptToSpeculativeTaskIndices.isEmpty) { allocationManager.onSchedulerQueueEmpty() } } @@ -751,6 +755,8 @@ private[spark] class ExecutorAllocationManager( if (taskStart.taskInfo.speculative) { stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]) += taskIndex + stageAttemptToPendingSpeculativeTasks + .get(stageAttempt).foreach(_.remove(taskIndex)) } else { stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt, new mutable.HashSet[Int]) += taskIndex @@ -776,15 +782,14 @@ private[spark] class ExecutorAllocationManager( } if (taskEnd.taskInfo.speculative) { stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}} - // If the previous task attempt succeeded first and it was the last task in a stage, - // the stage may have been removed before handing this speculative TaskEnd event. - if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) { - stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1 - } } taskEnd.reason match { - case Success | _: TaskKilled => + case Success => + // Remove pending speculative task in case the normal task + // is finished before starting the speculative task + stageAttemptToPendingSpeculativeTasks.get(stageAttempt).foreach(_.remove(taskIndex)) + case _: TaskKilled => case _ => if (!hasPendingTasks) { // If the task failed (not intentionally killed), we expect it to be resubmitted @@ -810,9 +815,10 @@ private[spark] class ExecutorAllocationManager( val stageId = speculativeTask.stageId val stageAttemptId = speculativeTask.stageAttemptId val stageAttempt = StageAttempt(stageId, stageAttemptId) + val taskIndex = speculativeTask.taskIndex allocationManager.synchronized { - stageAttemptToNumSpeculativeTasks(stageAttempt) = - stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1 + stageAttemptToPendingSpeculativeTasks.getOrElseUpdate(stageAttempt, + new mutable.HashSet[Int]).add(taskIndex) allocationManager.onSchedulerBacklogged() } } @@ -843,7 +849,7 @@ private[spark] class ExecutorAllocationManager( def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): Unit = { if (!stageAttemptToNumRunningTask.contains(stageAttempt) && !stageAttemptToNumTasks.contains(stageAttempt) && - !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) && + !stageAttemptToPendingSpeculativeTasks.contains(stageAttempt) && !stageAttemptToTaskIndices.contains(stageAttempt) && !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt) ) { @@ -896,9 +902,7 @@ private[spark] class ExecutorAllocationManager( } private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = { - val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 0) - val numRunning = stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0) - numTotalTasks - numRunning + stageAttemptToPendingSpeculativeTasks.get(attempt).map(_.size).getOrElse(0) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c55d44dfd593c..bb17a987717fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -383,8 +383,8 @@ private[spark] class DAGScheduler( /** * Called by the TaskSetManager when it decides a speculative task is needed. */ - def speculativeTaskSubmitted(task: Task[_]): Unit = { - eventProcessLoop.post(SpeculativeTaskSubmitted(task)) + def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { + eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex)) } /** @@ -1178,8 +1178,10 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) } - private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = { - listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)) + private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { + val speculativeTaskSubmittedEvent = new SparkListenerSpeculativeTaskSubmitted( + task.stageId, task.stageAttemptId, taskIndex, task.partitionId) + listenerBus.post(speculativeTaskSubmittedEvent) } private[scheduler] def handleUnschedulableTaskSetAdded( @@ -2962,8 +2964,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) - case SpeculativeTaskSubmitted(task) => - dagScheduler.handleSpeculativeTaskSubmitted(task) + case SpeculativeTaskSubmitted(task, taskIndex) => + dagScheduler.handleSpeculativeTaskSubmitted(task, taskIndex) case UnschedulableTaskSetAdded(stageId, stageAttemptId) => dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index f9df8de620ff8..c16e5ea03d7c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -99,7 +99,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent private[scheduler] -case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent +case class SpeculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1) extends DAGSchedulerEvent private[scheduler] case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index a9d8634794028..d3bbbaffd59a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -56,7 +56,21 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe case class SparkListenerSpeculativeTaskSubmitted( stageId: Int, stageAttemptId: Int = 0) - extends SparkListenerEvent + extends SparkListenerEvent { + // Note: this is here for backwards-compatibility with older versions of this event which + // didn't stored taskIndex + private var _taskIndex: Int = -1 + private var _partitionId: Int = -1 + + def taskIndex: Int = _taskIndex + def partitionId: Int = _partitionId + + def this(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: Int) = { + this(stageId, stageAttemptId) + _partitionId = partitionId + _taskIndex = taskIndex + } +} @DeveloperApi case class SparkListenerTaskEnd( diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2ed1ce1a49e30..cbb8fd0a33436 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1143,7 +1143,7 @@ private[spark] class TaskSetManager( " than %.0f ms(%d speculatable tasks in this taskset now)") .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) speculatableTasks += index - sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) + sched.dagScheduler.speculativeTaskSubmitted(tasks(index), index) } foundTasksResult |= speculated } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index c616c43fe1b1e..1cb913b248f7e 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -461,6 +461,16 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsTargetForDefaultProfileId(manager) === 10) } + private def speculativeTaskSubmitEventFromTaskIndex( + stageId: Int, + stageAttemptId: Int = 0, + taskIndex: Int = -1, + partitionId: Int = -1): SparkListenerSpeculativeTaskSubmitted = { + val event = new SparkListenerSpeculativeTaskSubmitted(stageId, stageAttemptId, + taskIndex = taskIndex, partitionId = partitionId) + event + } + test("add executors when speculative tasks added") { val manager = createManager(createConf(0, 10, 0)) @@ -469,13 +479,13 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { post(SparkListenerStageSubmitted(createStageInfo(1, 2))) // Verify that we're capped at number of tasks including the speculative ones in the stage - post(SparkListenerSpeculativeTaskSubmitted(1)) + post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 0)) assert(numExecutorsTargetForDefaultProfileId(manager) === 0) assert(numExecutorsToAddForDefaultProfile(manager) === 1) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) - post(SparkListenerSpeculativeTaskSubmitted(1)) - post(SparkListenerSpeculativeTaskSubmitted(1)) + post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 1)) + post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 2)) assert(numExecutorsTargetForDefaultProfileId(manager) === 1) assert(numExecutorsToAddForDefaultProfile(manager) === 2) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) @@ -671,6 +681,83 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { onExecutorRemoved(manager, "5") } + test("SPARK-41192: remove executors when task finished before speculative task scheduled") { + val clock = new ManualClock() + val stage = createStageInfo(0, 40) + val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4) + val manager = createManager(conf, clock = clock) + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + // submit 40 tasks, total executors needed = 40/4 = 10 + post(SparkListenerStageSubmitted(stage)) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + + (0 until 10).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString)) + (0 until 40).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach { + info => post(SparkListenerTaskStart(0, 0, info)) + } + assert(numExecutorsTarget(manager, defaultProfile.id) === 10) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 10) + // 30 tasks (0 - 29) finished + (0 until 30).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) } + // 10 speculative tasks (30 - 39) launch for the remaining tasks + (30 until 40).foreach { index => + post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = index))} + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 5) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5) + (0 until 5).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString))} + (0 until 5).foreach { i => onExecutorRemoved(manager, i.toString)} + + // 5 original tasks (30 - 34) finished before speculative task start, + // the speculative task will be removed from pending tasks + // executors needed = (5 + 5) / 4 + 1 + (30 until 35).map { i => + createTaskInfo(i, i, executorId = s"${i / 4}")} + .foreach { info => post( + SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))} + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 3) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3) + + (40 until 45).map { i => + createTaskInfo(i, i - 5, executorId = s"${i / 4}", speculative = true) + }.foreach { + info => post(SparkListenerTaskStart(0, 0, info)) + } + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 3) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3) + + (35 until 39).map { i => + createTaskInfo(i, i, executorId = s"${i / 4}") + }.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) + } + (35 until 39).map { i => + createTaskInfo(i + 5, i, executorId = s"${(i + 5) / 4}", speculative = true) + }.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, TaskKilled("attempt"), + info, new ExecutorMetrics, null)) + } + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + } + test("SPARK-30511 remove executors when speculative tasks end") { val clock = new ManualClock() val stage = createStageInfo(0, 40) @@ -707,7 +794,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { (0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)} // 10 speculative tasks (30 - 39) launch for the remaining tasks - (30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))} + (30 to 39).foreach { i => post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = i))} assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) @@ -785,7 +872,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { createTaskInfo(38, 38, executorId = "9"), new ExecutorMetrics, null)) post(SparkListenerTaskEnd(0, 0, null, UnknownReason, createTaskInfo(49, 39, executorId = "12", speculative = true), new ExecutorMetrics, null)) - post(SparkListenerSpeculativeTaskSubmitted(0)) + post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = 39)) clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) // maxNeeded = 1, allocate one more to satisfy speculation locality requirement diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2dc7f0d0dfaf3..a3b9eff808422 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -74,7 +74,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) taskScheduler.taskSetsFailed += taskSet.id } - override def speculativeTaskSubmitted(task: Task[_]): Unit = { + override def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { taskScheduler.speculativeTasks += task.partitionId } }