diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 0852e773c87b4..560b35f40c4e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2067,7 +2067,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE "autoBroadcastJoinThreshold" -> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "driverMemory" -> SparkLauncher.DRIVER_MEMORY, "analyzeTblMsg" -> analyzeTblMsg), - cause = oe.getCause) + cause = oe) } def executeCodePathUnsupportedError(execName: String): SparkUnsupportedOperationException = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index ad28fd5176d99..d2725bb7d1ae5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -927,6 +927,9 @@ class AdaptiveQueryExecSuite }) try { + object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy { + override val throwable = new RuntimeException("coalesce test error") + } spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", @@ -3107,6 +3110,69 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-50386: Check SparkFatalException message details when OutOfMemoryError " + + "occurs on BroadcastExchange execution") { + withTempView("t1", "t2") { + try { + object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy { + override val throwable = new OutOfMemoryError("Java heap space") + } + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val joinedDF = createJoinedDF() + + val error = intercept[SparkException] { + joinedDF.collect() + } + + // error.getMessage was coming as non-null by default behavior + assert(error.getMessage contains + "Not enough memory to build and broadcast the table to all worker nodes.") + // error.getCause was coming as null by default behavior + assert(error.getCause != null, "SparkException' s cause property has to be non-null") + assert(error.getCause.toString == "java.lang.OutOfMemoryError: Java heap space") + assert(error.getCause.getMessage == "Java heap space") + } + } finally { + spark.experimental.extraStrategies = Nil + } + } + } + + test("SPARK-50386: Check SparkFatalException message details when InterruptedException " + + "occurs on BroadcastExchange execution") { + withTempView("t1", "t2") { + try { + object TestProblematicCoalesceStrategy extends BaseTestProblematicCoalesceStrategy { + override val throwable = new InterruptedException() + } + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val joinedDF = createJoinedDF() + val error = intercept[java.util.concurrent.ExecutionException] { + joinedDF.collect() + } + val errorMessage = + "org.apache.spark.util.SparkFatalException: java.lang.InterruptedException" + // All following properties were coming as non-null by default behavior + assert(error.getMessage == errorMessage) + assert(error.getCause != null, "SparkFatalException' s cause has to be non-null") + assert(error.getCause.toString == errorMessage) + assert(error.getCause.getMessage == "java.lang.InterruptedException") + } + } finally { + spark.experimental.extraStrategies = Nil + } + } + } + + private def createJoinedDF(): DataFrame = { + spark.range(10).toDF("col1").createTempView("t1") + spark.range(5).coalesce(2).toDF("col2").createTempView("t2") + sql("SELECT /*+ BROADCAST(t2) */ * FROM t1 INNER JOIN t2 ON t1.col1 = t2.col2;") + } + } /** @@ -3130,25 +3196,32 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator { /** * Helps to simulate ExchangeQueryStageExec materialization failure. */ -private object TestProblematicCoalesceStrategy extends Strategy { - private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan) - extends UnaryExecNode { - override protected def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitions { _ => - throw new RuntimeException("coalesce test error") - } - } - override def output: Seq[Attribute] = child.output - override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec = - copy(child = newChild) - } +private trait BaseTestProblematicCoalesceStrategy extends Strategy { + + val throwable: Throwable override def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { case org.apache.spark.sql.catalyst.plans.logical.Repartition( numPartitions, false, child) => - TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil + TestProblematicCoalesceExec(numPartitions, planLater(child), throwable) :: Nil case _ => Nil } } } + +private case class TestProblematicCoalesceExec( + numPartitions: Int, child: SparkPlan, throwable: Throwable) + extends UnaryExecNode { + override protected def doExecute(): RDD[InternalRow] = { + throwable match { + case _: RuntimeException => child.execute().mapPartitions { _ => + throw throwable + } + case _ => throw throwable + } + } + override def output: Seq[Attribute] = child.output + override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec = + copy(child = newChild) +}