diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 0489619da40fb..ed766e7050d1c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -173,7 +173,7 @@ private[spark] object Config extends Logging { object ExecutorRollPolicy extends Enumeration { val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS, - PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, DISK_USED, + PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, TOTAL_SHUFFLE_WRITE, DISK_USED, OUTLIER, OUTLIER_NO_FALLBACK = Value } @@ -193,6 +193,7 @@ private[spark] object Config extends Logging { "PEAK_JVM_ONHEAP_MEMORY policy chooses an executor with the biggest peak JVM on-heap " + "memory. PEAK_JVM_OFFHEAP_MEMORY policy chooses an executor with the biggest peak JVM " + "off-heap memory. " + + "TOTAL_SHUFFLE_WRITE policy chooses an executor with the biggest total shuffle write. " + "DISK_USED policy chooses an executor with the biggest used disk size. " + "OUTLIER policy chooses an executor with outstanding statistics which is bigger than" + "at least two standard deviation from the mean in average task time, " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala index d0d30980fc1cc..e1a9a1f7abe28 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala @@ -128,6 +128,8 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { listWithoutDriver.sortBy(getPeakMetrics(_, "JVMHeapMemory")).reverse case ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY => listWithoutDriver.sortBy(getPeakMetrics(_, "JVMOffHeapMemory")).reverse + case ExecutorRollPolicy.TOTAL_SHUFFLE_WRITE => + listWithoutDriver.sortBy(_.totalShuffleWrite).reverse case ExecutorRollPolicy.DISK_USED => listWithoutDriver.sortBy(_.diskUsed).reverse case ExecutorRollPolicy.OUTLIER => @@ -144,7 +146,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { * We build multiple outlier lists and concat in the following importance order to find * outliers in various perspective: * AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS > - * PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY + * PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY > TOTAL_SHUFFLE_WRITE > DISK_USED * Since we will choose only first item, the duplication is okay. */ private def outliersFromMultipleDimensions(listWithoutDriver: Seq[v1.ExecutorSummary]) = @@ -154,6 +156,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { outliers(listWithoutDriver, e => e.failedTasks) ++ outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++ outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory")) ++ + outliers(listWithoutDriver, e => e.totalShuffleWrite) ++ outliers(listWithoutDriver, e => e.diskUsed) /** diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala index 5600efa0a69e5..d28487bedf8f4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala @@ -149,11 +149,19 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1, false, Set()) + val execWithBiggestTotalShuffleWrite = new ExecutorSummary("14", "host:port", true, 1, + 10, 10, 1, 1, 1, + 4, 0, 2, 280, + 30, 100, 100, + 15, false, 20, new Date(1639300001000L), + Option.empty, Option.empty, Map(), Option.empty, Set(), + metrics, Map(), Map(), 1, false, Set()) + val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime, execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks, execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID, execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory, - execWithBiggestDiskUsed) + execWithBiggestDiskUsed, execWithBiggestTotalShuffleWrite) override def beforeEach(): Unit = { super.beforeEach() @@ -227,6 +235,11 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.DISK_USED)).contains("13")) } + test("Policy: TOTAL_SHUFFLE_WRITE") { + assert(plugin.invokePrivate( + _choose(list, ExecutorRollPolicy.TOTAL_SHUFFLE_WRITE)).contains("14")) + } + test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") { assert( plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)) == @@ -388,4 +401,17 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.DISK_USED)) == plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK))) } + + test("Policy: OUTLIER_NO_FALLBACK - Detect a total shuffle write outlier") { + val outlier = new ExecutorSummary("9999", "host:port", true, 1, + 0, 10, 1, 0, 0, + 3, 0, 1, 100, + 0, 0, 0, + 1000, false, 0, new Date(1639300001000L), + Option.empty, Option.empty, Map(), Option.empty, Set(), + metrics, Map(), Map(), 1, false, Set()) + assert( + plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_SHUFFLE_WRITE)) == + plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK))) + } }