Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50386][SQL] Improve SparkFatalException Propagation when OutOfMemoryError occurs on BroadcastExchangeExec building small table to broadcast #48925

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

erenavsarogullari
Copy link
Member

What changes were proposed in this pull request?

When BroadcastHashJoin builds small table to broadcast by BroadcastExchangeExec, if OutOfMemoryError has occurred on driver, BroadcastExchangeExec throws SparkFatalException which wraps SparkException. However, SparkException' s cause property may come as null by missing actual cause which is java.lang.OutOfMemoryError: Java heap space in following example. Actual cause info is also useful to propagate Throwable.getCause / getCause.* properties to clients. Repro test case has been added.

Before Fix:

org.apache.spark.util.SparkFatalException: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
java.util.concurrent.ExecutionException: org.apache.spark.util.SparkFatalException: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:255)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcastBcast$1(SparkPlan.scala:204)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1375)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1429)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:200)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:259)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:256)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:196)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:377)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:458)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:427)
	... 400 more
Caused by: org.apache.spark.util.SparkFatalException: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:230)
	... 8 more
Caused by: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.notEnoughMemoryToBuildAndBroadcastTableError(QueryExecutionErrors.scala:2070)
	... 10 more

After Fix:

org.apache.spark.util.SparkFatalException: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
java.util.concurrent.ExecutionException: org.apache.spark.util.SparkFatalException: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:255)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcastBcast$1(SparkPlan.scala:204)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1375)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1429)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:200)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:259)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:256)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:196)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:377)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:458)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:427)
    ... 400 more
Caused by: org.apache.spark.util.SparkFatalException: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:230)
	... 8 more
Caused by: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.notEnoughMemoryToBuildAndBroadcastTableError(QueryExecutionErrors.scala:2070)
	... 10 more
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:221)
	... 9 more

Why are the changes needed?

SparkException' s cause property may come as null by missing actual cause which is java.lang.OutOfMemoryError: Java heap space in above example. Actual cause info is also useful to propagate Throwable.getCause / getCause.* properties to clients.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new 2 UTs.

Was this patch authored or co-authored using generative AI tooling?

No

…yError occurs on BroadcastExchangeExec building small table to broadcast
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant