From 3f22ef1721738ebacba8a27854ea8f24e0c6e5b9 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 9 Sep 2024 10:45:14 +0800 Subject: [PATCH] [SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable should not be changed by falling back to v1 command This is a followup of https://github.com/apache/spark/pull/47772 . The behavior of SaveAsTable should not be changed by switching v1 to v2 command. This is similar to https://github.com/apache/spark/pull/47995. For the case of `DelegatingCatalogExtension` we need it goes to V1 commands to be consistent with previous behavior. Behavior regression. No UT No Closes #48019 from amaliujia/regress_v2. Lead-authored-by: Wenchen Fan Co-authored-by: Rui Wang Signed-off-by: Wenchen Fan (cherry picked from commit 37b39b41d07cf8f39dd54cc18342e4d7b8bc71a3) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/DataFrameWriter.scala | 6 ++++-- ...DataSourceV2DataFrameSessionCatalogSuite.scala | 12 ++++++++++-- .../sql/connector/TestV2SessionCatalogBase.scala | 15 +++++---------- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 84f02c723136b..8c945ef0dbcba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -565,8 +565,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val session = df.sparkSession - val canUseV2 = lookupV2Provider().isDefined || - df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined + val canUseV2 = lookupV2Provider().isDefined || (df.sparkSession.sessionState.conf.getConf( + SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined && + !df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME) + .isInstanceOf[DelegatingCatalogExtension]) session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 79fbabbeacaa6..9dd20c906535e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite "and a same-name temp view exist") { withTable("same_name") { withTempView("same_name") { - val format = spark.sessionState.conf.defaultDataSourceName - sql(s"CREATE TABLE same_name(id LONG) USING $format") + sql(s"CREATE TABLE same_name(id LONG) USING $v2Format") spark.range(10).createTempView("same_name") spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name") checkAnswer(spark.table("same_name"), spark.range(10).toDF()) @@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite assert(tableInfo.properties().get("provider") === v2Format) } } + + test("SPARK-49246: saveAsTable with v1 format") { + withTable("t") { + sql("CREATE TABLE t(c INT) USING csv") + val df = spark.range(10).toDF() + df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t") + verifyTable("t", df) + } + } } class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 9144fb9390454..bd13123d587f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating if (tables.containsKey(ident)) { tables.get(ident) } else { - // Table was created through the built-in catalog - super.loadTable(ident) match { - case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW => v1Table - case t => - val table = newTable(t.name(), t.schema(), t.partitioning(), t.properties()) - addTable(ident, table) - table - } + // Table was created through the built-in catalog via v1 command, this is OK as the + // `loadTable` should always be invoked, and we set the `tableCreated` to pass validation. + tableCreated.set(true) + super.loadTable(ident) } }