Skip to content

Commit

Permalink
[SPARK-41543][K8S] Add TOTAL_SHUFFLE_WRITE executor roll policy
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to add new roll policy, `TOTAL_SHUFFLE_WRITE`.

### Why are the changes needed?

To provide a new built-in policy to the users.

### Does this PR introduce _any_ user-facing change?

No, this is a new policy.

### How was this patch tested?

Pass the CIs.

Closes apache#39088 from dongjoon-hyun/SPARK-41543.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Dec 16, 2022
1 parent 9244015 commit 5aca8b4
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private[spark] object Config extends Logging {

object ExecutorRollPolicy extends Enumeration {
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS,
PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, DISK_USED,
PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, TOTAL_SHUFFLE_WRITE, DISK_USED,
OUTLIER, OUTLIER_NO_FALLBACK = Value
}

Expand All @@ -193,6 +193,7 @@ private[spark] object Config extends Logging {
"PEAK_JVM_ONHEAP_MEMORY policy chooses an executor with the biggest peak JVM on-heap " +
"memory. PEAK_JVM_OFFHEAP_MEMORY policy chooses an executor with the biggest peak JVM " +
"off-heap memory. " +
"TOTAL_SHUFFLE_WRITE policy chooses an executor with the biggest total shuffle write. " +
"DISK_USED policy chooses an executor with the biggest used disk size. " +
"OUTLIER policy chooses an executor with outstanding statistics which is bigger than" +
"at least two standard deviation from the mean in average task time, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
listWithoutDriver.sortBy(getPeakMetrics(_, "JVMHeapMemory")).reverse
case ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY =>
listWithoutDriver.sortBy(getPeakMetrics(_, "JVMOffHeapMemory")).reverse
case ExecutorRollPolicy.TOTAL_SHUFFLE_WRITE =>
listWithoutDriver.sortBy(_.totalShuffleWrite).reverse
case ExecutorRollPolicy.DISK_USED =>
listWithoutDriver.sortBy(_.diskUsed).reverse
case ExecutorRollPolicy.OUTLIER =>
Expand All @@ -144,7 +146,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
* We build multiple outlier lists and concat in the following importance order to find
* outliers in various perspective:
* AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS >
* PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY
* PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY > TOTAL_SHUFFLE_WRITE > DISK_USED
* Since we will choose only first item, the duplication is okay.
*/
private def outliersFromMultipleDimensions(listWithoutDriver: Seq[v1.ExecutorSummary]) =
Expand All @@ -154,6 +156,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
outliers(listWithoutDriver, e => e.failedTasks) ++
outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++
outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory")) ++
outliers(listWithoutDriver, e => e.totalShuffleWrite) ++
outliers(listWithoutDriver, e => e.diskUsed)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,19 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
Option.empty, Option.empty, Map(), Option.empty, Set(),
metrics, Map(), Map(), 1, false, Set())

val execWithBiggestTotalShuffleWrite = new ExecutorSummary("14", "host:port", true, 1,
10, 10, 1, 1, 1,
4, 0, 2, 280,
30, 100, 100,
15, false, 20, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
metrics, Map(), Map(), 1, false, Set())

val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID,
execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory,
execWithBiggestDiskUsed)
execWithBiggestDiskUsed, execWithBiggestTotalShuffleWrite)

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down Expand Up @@ -227,6 +235,11 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.DISK_USED)).contains("13"))
}

test("Policy: TOTAL_SHUFFLE_WRITE") {
assert(plugin.invokePrivate(
_choose(list, ExecutorRollPolicy.TOTAL_SHUFFLE_WRITE)).contains("14"))
}

test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") {
assert(
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)) ==
Expand Down Expand Up @@ -388,4 +401,17 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.DISK_USED)) ==
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
}

test("Policy: OUTLIER_NO_FALLBACK - Detect a total shuffle write outlier") {
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
0, 10, 1, 0, 0,
3, 0, 1, 100,
0, 0, 0,
1000, false, 0, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
metrics, Map(), Map(), 1, false, Set())
assert(
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_SHUFFLE_WRITE)) ==
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
}
}

0 comments on commit 5aca8b4

Please sign in to comment.