Skip to content

Commit

Permalink
[SPARK-49152][SQL][FOLLOWUP] DelegatingCatalogExtension should also u…
Browse files Browse the repository at this point in the history
…se V1 commands

### What changes were proposed in this pull request?

This is a followup of #47660 . If users override `spark_catalog` with
`DelegatingCatalogExtension`, we should still use v1 commands as `DelegatingCatalogExtension` forwards requests to HMS and there are still behavior differences between v1 and v2 commands targeting HMS.

This PR also forces to use v1 commands for certain commands that do not have a v2 version.

### Why are the changes needed?

Avoid introducing behavior changes to Spark plugins that implements `DelegatingCatalogExtension` to override `spark_catalog`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new test case

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47995 from amaliujia/fix_catalog_v2.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Rui Wang <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
3 people committed Sep 5, 2024
1 parent 9676b1c commit f7cfeb5
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -284,10 +284,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
AnalyzeColumnCommand(ident, columnNames, allColumns)

case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) =>
// V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here.
case RepairTable(
ResolvedV1TableIdentifierInSessionCatalog(ident),
addPartitions,
dropPartitions) =>
RepairTableCommand(ident, addPartitions, dropPartitions)

case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
// V2 catalog doesn't support LOAD DATA yet, we must use v1 command here.
case LoadData(
ResolvedV1TableIdentifierInSessionCatalog(ident),
path,
isLocal,
isOverwrite,
partition) =>
LoadDataCommand(
ident,
path,
Expand Down Expand Up @@ -336,7 +346,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
ShowColumnsCommand(db, v1TableName, output)

case RecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
// V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here.
case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) =>
RepairTableCommand(
ident,
enableAddPartitions = true,
Expand Down Expand Up @@ -364,8 +375,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
purge,
retainData = false)

// V2 catalog doesn't support setting serde properties yet, we must use v1 command here.
case SetTableSerDeProperties(
ResolvedV1TableIdentifier(ident),
ResolvedV1TableIdentifierInSessionCatalog(ident),
serdeClassName,
serdeProperties,
partitionSpec) =>
Expand All @@ -380,10 +392,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

// V2 catalog doesn't support setting partition location yet, we must use v1 command here.
case SetTableLocation(
ResolvedTable(catalog, _, t: V1Table, _),
ResolvedV1TableIdentifierInSessionCatalog(ident),
Some(partitionSpec),
location) if isSessionCatalog(catalog) =>
AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location)
location) =>
AlterTableSetLocationCommand(ident, Some(partitionSpec), location)

case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)
Expand Down Expand Up @@ -600,6 +612,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
}

object ResolvedV1TableIdentifierInSessionCatalog {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) =>
Some(t.catalogTable.identifier)
case _ => None
}
}

object ResolvedV1TableOrViewIdentifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedV1TableIdentifier(ident) => Some(ident)
Expand Down Expand Up @@ -684,7 +704,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
isSessionCatalog(catalog) &&
SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty
isSessionCatalog(catalog) && (
SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty ||
catalog.isInstanceOf[DelegatingCatalogExtension])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ class DataSourceV2SQLSessionCatalogSuite
sql(s"CREATE EXTERNAL TABLE t (i INT) USING $v2Format TBLPROPERTIES($prop)")
}
}

test("SPARK-49152: partition columns should be put at the end") {
withTable("t") {
sql("CREATE TABLE t (c1 INT, c2 INT) USING json PARTITIONED BY (c1)")
// partition columns should be put at the end.
assert(getTableMetadata("default.t").columns().map(_.name()) === Seq("c2", "c1"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2125,10 +2125,18 @@ class DataSourceV2SQLSuiteV1Filter
}

test("REPLACE TABLE: v1 table") {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
val v2Catalog = catalog("spark_catalog").asTableCatalog
val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl"))
assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName)
val e = intercept[AnalysisException] {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
}
checkError(
exception = e,
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
sqlState = "0A000",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`tbl`",
"operation" -> "REPLACE TABLE"
)
)
}

test("DeleteFrom: - delete with invalid predicate") {
Expand Down

0 comments on commit f7cfeb5

Please sign in to comment.