Skip to content

Commit

Permalink
[SPARK-50262][SQL] Forbid specification complex types during altering…
Browse files Browse the repository at this point in the history
… collation

### What changes were proposed in this pull request?

[SPARK-48413](https://issues.apache.org/jira/browse/SPARK-48413) has brought ability to change collation on table.
So I suggest to shrink the feature to only leaf string types.

### Why are the changes needed?

Right now there is found problem with altering collation when table has metadata. Altering will be failed because of a metadata mismatch between user's type (that doesn't have metadata) and existing schema.
People will be able to change any collation still, but they need to use full column path.

### Does this PR introduce _any_ user-facing change?

Yes, but the changed feature wasn't released yet.

### How was this patch tested?

New test and fix old ones.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48803 from Alexvsalexvsalex/SPARK-50262_forbid_complex_types_during_collation_change.

Authored-by: Alexvsalexvsalex <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Alexvsalexvsalex authored and cloud-fan committed Nov 11, 2024
1 parent 7dbd17d commit a088597
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 47 deletions.
21 changes: 0 additions & 21 deletions sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -460,27 +460,6 @@ object DataType {
// String types with possibly different collations are compatible.
case (_: StringType, _: StringType) => true

case (ArrayType(fromElement, fromContainsNull), ArrayType(toElement, toContainsNull)) =>
(fromContainsNull == toContainsNull) &&
equalsIgnoreCompatibleCollation(fromElement, toElement)

case (
MapType(fromKey, fromValue, fromContainsNull),
MapType(toKey, toValue, toContainsNull)) =>
fromContainsNull == toContainsNull &&
// Map keys cannot change collation.
fromKey == toKey &&
equalsIgnoreCompatibleCollation(fromValue, toValue)

case (StructType(fromFields), StructType(toFields)) =>
fromFields.length == toFields.length &&
fromFields.zip(toFields).forall { case (fromField, toField) =>
fromField.name == toField.name &&
fromField.nullable == toField.nullable &&
fromField.metadata == toField.metadata &&
equalsIgnoreCompatibleCollation(fromField.dataType, toField.dataType)
}

case (fromDataType, toDataType) => fromDataType == toDataType
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1546,10 +1546,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
.map(dt => col.field.copy(dataType = dt))
.getOrElse(col.field)
val newDataType = a.dataType.get
val sameTypeExceptCollations =
DataType.equalsIgnoreCompatibleCollation(field.dataType, newDataType)
newDataType match {
case _ if sameTypeExceptCollations => // Allow changing type collations.
case _: StructType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.STRUCT_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
Expand All @@ -1576,10 +1573,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
case _ =>
Cast.canUpCast(from, to) ||
DataType.equalsIgnoreCompatibleCollation(field.dataType, newDataType)
}

if (!sameTypeExceptCollations && !canAlterColumnType(field.dataType, newDataType)) {
if (!canAlterColumnType(field.dataType, newDataType)) {
alter.failAnalysis(
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
messageParameters = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
ArrayType(StringType),
ArrayType(StringType("UTF8_LCASE")),
expected = true
expected = false
)
checkEqualsIgnoreCompatibleCollation(
ArrayType(StringType),
Expand All @@ -732,12 +732,12 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
ArrayType(ArrayType(StringType)),
ArrayType(ArrayType(StringType("UTF8_LCASE"))),
expected = true
expected = false
)
checkEqualsIgnoreCompatibleCollation(
MapType(StringType, StringType),
MapType(StringType, StringType("UTF8_LCASE")),
expected = true
expected = false
)
checkEqualsIgnoreCompatibleCollation(
MapType(StringType("UTF8_LCASE"), StringType),
Expand All @@ -747,7 +747,7 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
MapType(StringType("UTF8_LCASE"), ArrayType(StringType)),
MapType(StringType("UTF8_LCASE"), ArrayType(StringType("UTF8_LCASE"))),
expected = true
expected = false
)
checkEqualsIgnoreCompatibleCollation(
MapType(ArrayType(StringType), IntegerType),
Expand All @@ -762,12 +762,12 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
StructType(StructField("a", StringType) :: Nil),
StructType(StructField("a", StringType("UTF8_LCASE")) :: Nil),
expected = true
expected = false
)
checkEqualsIgnoreCompatibleCollation(
StructType(StructField("a", ArrayType(StringType)) :: Nil),
StructType(StructField("a", ArrayType(StringType("UTF8_LCASE"))) :: Nil),
expected = true
expected = false
)
checkEqualsIgnoreCompatibleCollation(
StructType(StructField("a", MapType(StringType, IntegerType)) :: Nil),
Expand Down
48 changes: 44 additions & 4 deletions sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.{SqlApiConf, SQLConf}
import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, MetadataBuilder, StringType, StructField, StructType}

class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
Expand Down Expand Up @@ -529,15 +529,55 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
|""".stripMargin)
sql(s"INSERT INTO $tableName VALUES ('a', array('b'), map(1, 'c'), struct('d'))")
sql(s"ALTER TABLE $tableName ALTER COLUMN c1 TYPE STRING COLLATE UTF8_LCASE")
sql(s"ALTER TABLE $tableName ALTER COLUMN c2 TYPE ARRAY<STRING COLLATE UNICODE_CI>")
sql(s"ALTER TABLE $tableName ALTER COLUMN c3 TYPE MAP<INT, STRING COLLATE UTF8_BINARY>")
sql(s"ALTER TABLE $tableName ALTER COLUMN c4 TYPE STRUCT<t: STRING COLLATE UNICODE>")
sql(s"ALTER TABLE $tableName ALTER COLUMN c2.element TYPE STRING COLLATE UNICODE_CI")
sql(s"ALTER TABLE $tableName ALTER COLUMN c3.value TYPE STRING COLLATE UTF8_BINARY")
sql(s"ALTER TABLE $tableName ALTER COLUMN c4.t TYPE STRING COLLATE UNICODE")
checkAnswer(sql(s"SELECT collation(c1), collation(c2[0]), " +
s"collation(c3[1]), collation(c4.t) FROM $tableName"),
Seq(Row("UTF8_LCASE", "UNICODE_CI", "UTF8_BINARY", "UNICODE")))
}
}

test("SPARK-50262: Alter column with collation preserve metadata") {
def createMetadata(column: String): Metadata =
new MetadataBuilder().putString("key", column).build()

val tableName = "testcat.alter_column_tbl"
withTable(tableName) {
val df = spark.createDataFrame(
java.util.List.of[Row](),
StructType(Seq(
StructField("c1", StringType, metadata = createMetadata("c1")),
StructField("c2", ArrayType(StringType), metadata = createMetadata("c2")),
StructField("c3", MapType(IntegerType, StringType), metadata = createMetadata("c3")),
StructField("c4",
StructType(Seq(StructField("t", StringType, metadata = createMetadata("c4t")))),
metadata = createMetadata("c4"))
))
)
df.write.format("parquet").saveAsTable(tableName)

sql(s"INSERT INTO $tableName VALUES ('a', array('b'), map(1, 'c'), struct('d'))")
sql(s"ALTER TABLE $tableName ALTER COLUMN c1 TYPE STRING COLLATE UTF8_LCASE")
sql(s"ALTER TABLE $tableName ALTER COLUMN c2.element TYPE STRING COLLATE UNICODE_CI")
sql(s"ALTER TABLE $tableName ALTER COLUMN c3.value TYPE STRING COLLATE UTF8_BINARY")
sql(s"ALTER TABLE $tableName ALTER COLUMN c4.t TYPE STRING COLLATE UNICODE")
val testCatalog = catalog("testcat").asTableCatalog
val tableSchema = testCatalog.loadTable(Identifier.of(Array(), "alter_column_tbl")).schema()
val c1Metadata = tableSchema.find(_.name == "c1").get.metadata
assert(c1Metadata === createMetadata("c1"))
val c2Metadata = tableSchema.find(_.name == "c2").get.metadata
assert(c2Metadata === createMetadata("c2"))
val c3Metadata = tableSchema.find(_.name == "c3").get.metadata
assert(c3Metadata === createMetadata("c3"))
val c4Metadata = tableSchema.find(_.name == "c4").get.metadata
assert(c4Metadata === createMetadata("c4"))
val c4tMetadata = tableSchema.find(_.name == "c4").get.dataType
.asInstanceOf[StructType].find(_.name == "t").get.metadata
assert(c4tMetadata === createMetadata("c4t"))
}
}

test("SPARK-47210: Implicit casting of collated strings") {
val tableName = "parquet_dummy_implicit_cast_t22"
withTable(tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2344,18 +2344,22 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
sql("CREATE TABLE t2(col ARRAY<STRING>) USING parquet")
sql("INSERT INTO t2 VALUES (ARRAY('a'))")
checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_BINARY"))
sql("ALTER TABLE t2 ALTER COLUMN col TYPE ARRAY<STRING COLLATE UTF8_LCASE>")
checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_LCASE"))
assertThrows[AnalysisException] {
sql("ALTER TABLE t2 ALTER COLUMN col TYPE ARRAY<STRING COLLATE UTF8_LCASE>")
}
checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_BINARY"))

// `MapType` with collation.
sql("CREATE TABLE t3(col MAP<STRING, STRING>) USING parquet")
sql("INSERT INTO t3 VALUES (MAP('k', 'v'))")
checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"), Row("UTF8_BINARY"))
sql(
"""
|ALTER TABLE t3 ALTER COLUMN col TYPE
|MAP<STRING, STRING COLLATE UTF8_LCASE>""".stripMargin)
checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"), Row("UTF8_LCASE"))
assertThrows[AnalysisException] {
sql(
"""
|ALTER TABLE t3 ALTER COLUMN col TYPE
|MAP<STRING, STRING COLLATE UTF8_LCASE>""".stripMargin)
}
checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"), Row("UTF8_BINARY"))

// Invalid change of map key collation.
val alterMap =
Expand All @@ -2367,7 +2371,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
},
condition = "NOT_SUPPORTED_CHANGE_COLUMN",
parameters = Map(
"originType" -> "\"MAP<STRING, STRING COLLATE UTF8_LCASE>\"",
"originType" -> "\"MAP<STRING, STRING>\"",
"originName" -> "`col`",
"table" -> "`spark_catalog`.`default`.`t3`",
"newType" -> "\"MAP<STRING COLLATE UTF8_LCASE, STRING>\"",
Expand All @@ -2380,8 +2384,10 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
sql("CREATE TABLE t4(col STRUCT<a:STRING>) USING parquet")
sql("INSERT INTO t4 VALUES (NAMED_STRUCT('a', 'value'))")
checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_BINARY"))
sql("ALTER TABLE t4 ALTER COLUMN col TYPE STRUCT<a:STRING COLLATE UTF8_LCASE>")
checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_LCASE"))
assertThrows[AnalysisException] {
sql("ALTER TABLE t4 ALTER COLUMN col TYPE STRUCT<a:STRING COLLATE UTF8_LCASE>")
}
checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_BINARY"))
}
}

Expand Down

0 comments on commit a088597

Please sign in to comment.