Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50386][SQL] Improve SparkFatalException Propagation when OutOfMemoryError occurs on BroadcastExchangeExec building small table to broadcast #48925

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"autoBroadcastJoinThreshold" -> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key,
"driverMemory" -> SparkLauncher.DRIVER_MEMORY,
"analyzeTblMsg" -> analyzeTblMsg),
cause = oe.getCause)
cause = oe)
}

def executeCodePathUnsupportedError(execName: String): SparkUnsupportedOperationException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,9 @@ class AdaptiveQueryExecSuite
})

try {
object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy {
override val throwable = new RuntimeException("coalesce test error")
}
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
Expand Down Expand Up @@ -3107,6 +3110,69 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-50386: Check SparkFatalException message details when OutOfMemoryError " +
"occurs on BroadcastExchange execution") {
withTempView("t1", "t2") {
try {
object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy {
override val throwable = new OutOfMemoryError("Java heap space")
}
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val joinedDF = createJoinedDF()

val error = intercept[SparkException] {
joinedDF.collect()
}

// error.getMessage was coming as non-null by default behavior
assert(error.getMessage contains
"Not enough memory to build and broadcast the table to all worker nodes.")
// error.getCause was coming as null by default behavior
assert(error.getCause != null, "SparkException' s cause property has to be non-null")
assert(error.getCause.toString == "java.lang.OutOfMemoryError: Java heap space")
assert(error.getCause.getMessage == "Java heap space")
}
} finally {
spark.experimental.extraStrategies = Nil
}
}
}

test("SPARK-50386: Check SparkFatalException message details when InterruptedException " +
"occurs on BroadcastExchange execution") {
withTempView("t1", "t2") {
try {
object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy {
override val throwable = new InterruptedException()
}
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val joinedDF = createJoinedDF()
val error = intercept[java.util.concurrent.ExecutionException] {
joinedDF.collect()
}
val errorMessage =
"org.apache.spark.util.SparkFatalException: java.lang.InterruptedException"
// All following properties were coming as non-null by default behavior
assert(error.getMessage == errorMessage)
assert(error.getCause != null, "SparkFatalException' s cause has to be non-null")
assert(error.getCause.toString == errorMessage)
assert(error.getCause.getMessage == "java.lang.InterruptedException")
}
} finally {
spark.experimental.extraStrategies = Nil
}
}
}

private def createJoinedDF(): DataFrame = {
spark.range(10).toDF("col1").createTempView("t1")
spark.range(5).coalesce(2).toDF("col2").createTempView("t2")
sql("SELECT /*+ BROADCAST(t2) */ * FROM t1 INNER JOIN t2 ON t1.col1 = t2.col2;")
}

}

/**
Expand All @@ -3130,25 +3196,32 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator {
/**
* Helps to simulate ExchangeQueryStageExec materialization failure.
*/
private object TestProblematicCoalesceStrategy extends Strategy {
private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan)
extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { _ =>
throw new RuntimeException("coalesce test error")
}
}
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec =
copy(child = newChild)
}
private trait BaseTestProblematicCoalesceStrategy extends Strategy {

val throwable: Throwable

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case org.apache.spark.sql.catalyst.plans.logical.Repartition(
numPartitions, false, child) =>
TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil
TestProblematicCoalesceExec(numPartitions, planLater(child), throwable) :: Nil
case _ => Nil
}
}
}

private case class TestProblematicCoalesceExec(
numPartitions: Int, child: SparkPlan, throwable: Throwable)
extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] = {
throwable match {
case _: RuntimeException => child.execute().mapPartitions { _ =>
throw throwable
}
case _ => throw throwable
}
}
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec =
copy(child = newChild)
}