Skip to content

Commit

Permalink
[SPARK-41192][CORE] Remove unscheduled speculative tasks when task fi…
Browse files Browse the repository at this point in the history
…nished to obtain better dynamic

### What changes were proposed in this pull request?
ExecutorAllocationManager only record count for speculative task, `stageAttemptToNumSpeculativeTasks` increment when speculative task submit, and only decrement when speculative task end.
If task finished before speculative task start, the speculative task will never be scheduled, which will cause leak of `stageAttemptToNumSpeculativeTasks` and mislead the calculation of target executors.

This PR fixes the issue by add task index in `SparkListenerSpeculativeTaskSubmitted` event, and record speculative task with task index when submitted, task index should be removed when speculative task start or task finished(whether it is speculative or not)

### Why are the changes needed?
To fix idle executors caused by pending speculative task from task that has been finished

### Does this PR introduce _any_ user-facing change?
DeveloperApi `SparkListenerSpeculativeTaskSubmitted` add taskIndex with default value -1

### How was this patch tested?
Add a comprehensive unit test.
Pass the GA

Closes apache#38711 from toujours33/SPARK-41192.

Lead-authored-by: wangyazhi <[email protected]>
Co-authored-by: toujours33 <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
toujours33 authored and Mridul Muralidharan committed Dec 21, 2022
1 parent fd6d226 commit 801e079
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
// Should be 0 when no stages are active.
private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
// Number of speculative tasks pending/running in each stageAttempt
private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
// The speculative tasks started in each stageAttempt
// Map from each stageAttempt to a set of running speculative task indexes
// TODO(SPARK-41192): We simply need an Int for this.
private val stageAttemptToSpeculativeTaskIndices =
new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
// Map from each stageAttempt to a set of pending speculative task indexes
private val stageAttemptToPendingSpeculativeTasks =
new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]

private val resourceProfileIdToStageAttempt =
Expand Down Expand Up @@ -722,7 +724,7 @@ private[spark] class ExecutorAllocationManager(
// because the attempt may still have running tasks,
// even after another attempt for the stage is submitted.
stageAttemptToNumTasks -= stageAttempt
stageAttemptToNumSpeculativeTasks -= stageAttempt
stageAttemptToPendingSpeculativeTasks -= stageAttempt
stageAttemptToTaskIndices -= stageAttempt
stageAttemptToSpeculativeTaskIndices -= stageAttempt
stageAttemptToExecutorPlacementHints -= stageAttempt
Expand All @@ -733,7 +735,9 @@ private[spark] class ExecutorAllocationManager(

// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) {
if (stageAttemptToNumTasks.isEmpty
&& stageAttemptToPendingSpeculativeTasks.isEmpty
&& stageAttemptToSpeculativeTaskIndices.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}
}
Expand All @@ -751,6 +755,8 @@ private[spark] class ExecutorAllocationManager(
if (taskStart.taskInfo.speculative) {
stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
new mutable.HashSet[Int]) += taskIndex
stageAttemptToPendingSpeculativeTasks
.get(stageAttempt).foreach(_.remove(taskIndex))
} else {
stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt,
new mutable.HashSet[Int]) += taskIndex
Expand All @@ -776,15 +782,14 @@ private[spark] class ExecutorAllocationManager(
}
if (taskEnd.taskInfo.speculative) {
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
// If the previous task attempt succeeded first and it was the last task in a stage,
// the stage may have been removed before handing this speculative TaskEnd event.
if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
}
}

taskEnd.reason match {
case Success | _: TaskKilled =>
case Success =>
// Remove pending speculative task in case the normal task
// is finished before starting the speculative task
stageAttemptToPendingSpeculativeTasks.get(stageAttempt).foreach(_.remove(taskIndex))
case _: TaskKilled =>
case _ =>
if (!hasPendingTasks) {
// If the task failed (not intentionally killed), we expect it to be resubmitted
Expand All @@ -810,9 +815,10 @@ private[spark] class ExecutorAllocationManager(
val stageId = speculativeTask.stageId
val stageAttemptId = speculativeTask.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
val taskIndex = speculativeTask.taskIndex
allocationManager.synchronized {
stageAttemptToNumSpeculativeTasks(stageAttempt) =
stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
stageAttemptToPendingSpeculativeTasks.getOrElseUpdate(stageAttempt,
new mutable.HashSet[Int]).add(taskIndex)
allocationManager.onSchedulerBacklogged()
}
}
Expand Down Expand Up @@ -843,7 +849,7 @@ private[spark] class ExecutorAllocationManager(
def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): Unit = {
if (!stageAttemptToNumRunningTask.contains(stageAttempt) &&
!stageAttemptToNumTasks.contains(stageAttempt) &&
!stageAttemptToNumSpeculativeTasks.contains(stageAttempt) &&
!stageAttemptToPendingSpeculativeTasks.contains(stageAttempt) &&
!stageAttemptToTaskIndices.contains(stageAttempt) &&
!stageAttemptToSpeculativeTaskIndices.contains(stageAttempt)
) {
Expand Down Expand Up @@ -896,9 +902,7 @@ private[spark] class ExecutorAllocationManager(
}

private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = {
val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 0)
val numRunning = stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0)
numTotalTasks - numRunning
stageAttemptToPendingSpeculativeTasks.get(attempt).map(_.size).getOrElse(0)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
/**
* Called by the TaskSetManager when it decides a speculative task is needed.
*/
def speculativeTaskSubmitted(task: Task[_]): Unit = {
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))
}

/**
Expand Down Expand Up @@ -1178,8 +1178,10 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}

private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
val speculativeTaskSubmittedEvent = new SparkListenerSpeculativeTaskSubmitted(
task.stageId, task.stageAttemptId, taskIndex, task.partitionId)
listenerBus.post(speculativeTaskSubmittedEvent)
}

private[scheduler] def handleUnschedulableTaskSetAdded(
Expand Down Expand Up @@ -2962,8 +2964,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)
case SpeculativeTaskSubmitted(task, taskIndex) =>
dagScheduler.handleSpeculativeTaskSubmitted(task, taskIndex)

case UnschedulableTaskSetAdded(stageId, stageAttemptId) =>
dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr
private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent

private[scheduler]
case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
case class SpeculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1) extends DAGSchedulerEvent

private[scheduler]
case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,21 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
case class SparkListenerSpeculativeTaskSubmitted(
stageId: Int,
stageAttemptId: Int = 0)
extends SparkListenerEvent
extends SparkListenerEvent {
// Note: this is here for backwards-compatibility with older versions of this event which
// didn't stored taskIndex
private var _taskIndex: Int = -1
private var _partitionId: Int = -1

def taskIndex: Int = _taskIndex
def partitionId: Int = _partitionId

def this(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: Int) = {
this(stageId, stageAttemptId)
_partitionId = partitionId
_taskIndex = taskIndex
}
}

@DeveloperApi
case class SparkListenerTaskEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ private[spark] class TaskSetManager(
" than %.0f ms(%d speculatable tasks in this taskset now)")
.format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
sched.dagScheduler.speculativeTaskSubmitted(tasks(index), index)
}
foundTasksResult |= speculated
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,16 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
}

private def speculativeTaskSubmitEventFromTaskIndex(
stageId: Int,
stageAttemptId: Int = 0,
taskIndex: Int = -1,
partitionId: Int = -1): SparkListenerSpeculativeTaskSubmitted = {
val event = new SparkListenerSpeculativeTaskSubmitted(stageId, stageAttemptId,
taskIndex = taskIndex, partitionId = partitionId)
event
}

test("add executors when speculative tasks added") {
val manager = createManager(createConf(0, 10, 0))

Expand All @@ -469,13 +479,13 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {

post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
// Verify that we're capped at number of tasks including the speculative ones in the stage
post(SparkListenerSpeculativeTaskSubmitted(1))
post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 0))
assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
assert(numExecutorsToAddForDefaultProfile(manager) === 1)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
post(SparkListenerSpeculativeTaskSubmitted(1))
post(SparkListenerSpeculativeTaskSubmitted(1))
post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 1))
post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 2))
assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
assert(numExecutorsToAddForDefaultProfile(manager) === 2)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
Expand Down Expand Up @@ -671,6 +681,83 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
onExecutorRemoved(manager, "5")
}

test("SPARK-41192: remove executors when task finished before speculative task scheduled") {
val clock = new ManualClock()
val stage = createStageInfo(0, 40)
val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4)
val manager = createManager(conf, clock = clock)
val updatesNeeded =
new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]

// submit 40 tasks, total executors needed = 40/4 = 10
post(SparkListenerStageSubmitted(stage))
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())

(0 until 10).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
(0 until 40).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
info => post(SparkListenerTaskStart(0, 0, info))
}
assert(numExecutorsTarget(manager, defaultProfile.id) === 10)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 10)
// 30 tasks (0 - 29) finished
(0 until 30).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) }
// 10 speculative tasks (30 - 39) launch for the remaining tasks
(30 until 40).foreach { index =>
post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = index))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager, defaultProfile.id) === 5)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
(0 until 5).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString))}
(0 until 5).foreach { i => onExecutorRemoved(manager, i.toString)}

// 5 original tasks (30 - 34) finished before speculative task start,
// the speculative task will be removed from pending tasks
// executors needed = (5 + 5) / 4 + 1
(30 until 35).map { i =>
createTaskInfo(i, i, executorId = s"${i / 4}")}
.foreach { info => post(
SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3)

(40 until 45).map { i =>
createTaskInfo(i, i - 5, executorId = s"${i / 4}", speculative = true)
}.foreach {
info => post(SparkListenerTaskStart(0, 0, info))
}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3)

(35 until 39).map { i =>
createTaskInfo(i, i, executorId = s"${i / 4}")
}.foreach {
info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))
}
(35 until 39).map { i =>
createTaskInfo(i + 5, i, executorId = s"${(i + 5) / 4}", speculative = true)
}.foreach {
info => post(SparkListenerTaskEnd(0, 0, null, TaskKilled("attempt"),
info, new ExecutorMetrics, null))
}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
}

test("SPARK-30511 remove executors when speculative tasks end") {
val clock = new ManualClock()
val stage = createStageInfo(0, 40)
Expand Down Expand Up @@ -707,7 +794,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
(0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)}

// 10 speculative tasks (30 - 39) launch for the remaining tasks
(30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))}
(30 to 39).foreach { i => post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = i))}
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
Expand Down Expand Up @@ -785,7 +872,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
createTaskInfo(38, 38, executorId = "9"), new ExecutorMetrics, null))
post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
createTaskInfo(49, 39, executorId = "12", speculative = true), new ExecutorMetrics, null))
post(SparkListenerSpeculativeTaskSubmitted(0))
post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = 39))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
// maxNeeded = 1, allocate one more to satisfy speculation locality requirement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
taskScheduler.taskSetsFailed += taskSet.id
}

override def speculativeTaskSubmitted(task: Task[_]): Unit = {
override def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
taskScheduler.speculativeTasks += task.partitionId
}
}
Expand Down

0 comments on commit 801e079

Please sign in to comment.