Skip to content

Commit

Permalink
[SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable should not …
Browse files Browse the repository at this point in the history
…be changed by falling back to v1 command

This is a followup of #47772 . The behavior of SaveAsTable should not be changed by switching v1 to v2 command. This is similar to #47995. For the case of `DelegatingCatalogExtension` we need it goes to V1 commands to be consistent with previous behavior.

Behavior regression.

No

UT

No

Closes #48019 from amaliujia/regress_v2.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 37b39b4)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan and amaliujia committed Sep 9, 2024
1 parent 46214da commit 3f22ef1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined ||
df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
val canUseV2 = lookupV2Provider().isDefined || (df.sparkSession.sessionState.conf.getConf(
SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
.isInstanceOf[DelegatingCatalogExtension])

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
val format = spark.sessionState.conf.defaultDataSourceName
sql(s"CREATE TABLE same_name(id LONG) USING $format")
sql(s"CREATE TABLE same_name(id LONG) USING $v2Format")
spark.range(10).createTempView("same_name")
spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
Expand Down Expand Up @@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite
assert(tableInfo.properties().get("provider") === v2Format)
}
}

test("SPARK-49246: saveAsTable with v1 format") {
withTable("t") {
sql("CREATE TABLE t(c INT) USING csv")
val df = spark.range(10).toDF()
df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t")
verifyTable("t", df)
}
}
}

class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating
if (tables.containsKey(ident)) {
tables.get(ident)
} else {
// Table was created through the built-in catalog
super.loadTable(ident) match {
case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW => v1Table
case t =>
val table = newTable(t.name(), t.schema(), t.partitioning(), t.properties())
addTable(ident, table)
table
}
// Table was created through the built-in catalog via v1 command, this is OK as the
// `loadTable` should always be invoked, and we set the `tableCreated` to pass validation.
tableCreated.set(true)
super.loadTable(ident)
}
}

Expand Down

0 comments on commit 3f22ef1

Please sign in to comment.