From f7cfeb534d9285df381d147e01de47ec439c082e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2024 21:02:20 +0800 Subject: [PATCH] [SPARK-49152][SQL][FOLLOWUP] DelegatingCatalogExtension should also use V1 commands ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/47660 . If users override `spark_catalog` with `DelegatingCatalogExtension`, we should still use v1 commands as `DelegatingCatalogExtension` forwards requests to HMS and there are still behavior differences between v1 and v2 commands targeting HMS. This PR also forces to use v1 commands for certain commands that do not have a v2 version. ### Why are the changes needed? Avoid introducing behavior changes to Spark plugins that implements `DelegatingCatalogExtension` to override `spark_catalog`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test case ### Was this patch authored or co-authored using generative AI tooling? No Closes #47995 from amaliujia/fix_catalog_v2. Lead-authored-by: Wenchen Fan Co-authored-by: Rui Wang Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../analysis/ResolveSessionCatalog.scala | 41 ++++++++++++++----- .../DataSourceV2SQLSessionCatalogSuite.scala | 8 ++++ .../sql/connector/DataSourceV2SQLSuite.scala | 16 ++++++-- 3 files changed, 51 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index d569f1ed484cc..02ad2e79a5645 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command._ @@ -284,10 +284,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => AnalyzeColumnCommand(ident, columnNames, allColumns) - case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) => + // V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here. + case RepairTable( + ResolvedV1TableIdentifierInSessionCatalog(ident), + addPartitions, + dropPartitions) => RepairTableCommand(ident, addPartitions, dropPartitions) - case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => + // V2 catalog doesn't support LOAD DATA yet, we must use v1 command here. + case LoadData( + ResolvedV1TableIdentifierInSessionCatalog(ident), + path, + isLocal, + isOverwrite, + partition) => LoadDataCommand( ident, path, @@ -336,7 +346,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } ShowColumnsCommand(db, v1TableName, output) - case RecoverPartitions(ResolvedV1TableIdentifier(ident)) => + // V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here. + case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) => RepairTableCommand( ident, enableAddPartitions = true, @@ -364,8 +375,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) purge, retainData = false) + // V2 catalog doesn't support setting serde properties yet, we must use v1 command here. case SetTableSerDeProperties( - ResolvedV1TableIdentifier(ident), + ResolvedV1TableIdentifierInSessionCatalog(ident), serdeClassName, serdeProperties, partitionSpec) => @@ -380,10 +392,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // V2 catalog doesn't support setting partition location yet, we must use v1 command here. case SetTableLocation( - ResolvedTable(catalog, _, t: V1Table, _), + ResolvedV1TableIdentifierInSessionCatalog(ident), Some(partitionSpec), - location) if isSessionCatalog(catalog) => - AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location) + location) => + AlterTableSetLocationCommand(ident, Some(partitionSpec), location) case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) => AlterViewAsCommand(ident, originalText, query) @@ -600,6 +612,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } } + object ResolvedV1TableIdentifierInSessionCatalog { + def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { + case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => + Some(t.catalogTable.identifier) + case _ => None + } + } + object ResolvedV1TableOrViewIdentifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { case ResolvedV1TableIdentifier(ident) => Some(ident) @@ -684,7 +704,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } private def supportsV1Command(catalog: CatalogPlugin): Boolean = { - isSessionCatalog(catalog) && - SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty + isSessionCatalog(catalog) && ( + SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty || + catalog.isInstanceOf[DelegatingCatalogExtension]) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala index 95624f3f61c5c..7463eb34d17ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSessionCatalogSuite.scala @@ -71,4 +71,12 @@ class DataSourceV2SQLSessionCatalogSuite sql(s"CREATE EXTERNAL TABLE t (i INT) USING $v2Format TBLPROPERTIES($prop)") } } + + test("SPARK-49152: partition columns should be put at the end") { + withTable("t") { + sql("CREATE TABLE t (c1 INT, c2 INT) USING json PARTITIONED BY (c1)") + // partition columns should be put at the end. + assert(getTableMetadata("default.t").columns().map(_.name()) === Seq("c2", "c1")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 1d37c6aa4eb7f..922bf01b541a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2125,10 +2125,18 @@ class DataSourceV2SQLSuiteV1Filter } test("REPLACE TABLE: v1 table") { - sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") - val v2Catalog = catalog("spark_catalog").asTableCatalog - val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl")) - assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName) + val e = intercept[AnalysisException] { + sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") + } + checkError( + exception = e, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + sqlState = "0A000", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`tbl`", + "operation" -> "REPLACE TABLE" + ) + ) } test("DeleteFrom: - delete with invalid predicate") {