From 85c9cf8d6a688e4d6f9b295a3c9705a5314c49c6 Mon Sep 17 00:00:00 2001 From: Milan Dankovic Date: Mon, 25 Nov 2024 16:09:53 +0100 Subject: [PATCH] Fix withErrorHandling and remove writing to sink for already executed commands --- .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../spark/sql/scripting/SqlScriptingExecution.scala | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 0f3c0b7a02be2..dd3b8ce4967ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -420,7 +420,7 @@ class SparkSession private( var result: Option[Seq[Row]] = None while (sse.hasNext) { - sse.withErrorHandling() { + sse.withErrorHandling { val df = sse.next() if (sse.hasNext) { df.write.format("noop").mode("overwrite").save() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala index 755d901e24f0c..427d3bbf8d04a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.scripting import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, CompoundBody} +import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, CompoundBody, MultiResult} /** * SQL scripting executor - executes script and returns result statements. @@ -53,12 +53,10 @@ class SqlScriptingExecution( while (currentStatement.isDefined) { currentStatement match { case Some(stmt: SingleStatementExec) if !stmt.isExecuted => - withErrorHandling() { + withErrorHandling { val df = stmt.buildDataFrame(session) - if (df.logicalPlan.isInstanceOf[CommandResult]) { - // If the statement is not a result, we need to write it to a noop sink to execute it - df.write.format("noop").mode("overwrite").save() - } else { + if (!df.logicalPlan.isInstanceOf[CommandResult] + && !df.logicalPlan.isInstanceOf[MultiResult]) { // If the statement is a result, we need to return it to the caller return Some(df) } @@ -76,7 +74,7 @@ class SqlScriptingExecution( throw e } - def withErrorHandling()(f: => Unit): Unit = { + def withErrorHandling(f: => Unit): Unit = { try { f } catch {