diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 18cc579e4f9ea..aaad2a3280919 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -34,6 +34,7 @@ license: | - Valid hexadecimal strings should include only allowed symbols (0-9A-Fa-f). - Valid values for `fmt` are case-insensitive `hex`, `base64`, `utf-8`, `utf8`. - Since Spark 3.4, Spark throws only `PartitionsAlreadyExistException` when it creates partitions but some of them exist already. In Spark 3.3 or earlier, Spark can throw either `PartitionsAlreadyExistException` or `PartitionAlreadyExistsException`. + - Since Spark 3.4, Spark will do validation for partition spec in ALTER PARTITION to follow the behavior of `spark.sql.storeAssignmentPolicy` which may cause an exception if type conversion fails, e.g. `ALTER TABLE .. ADD PARTITION(p='a')` if column `p` is int type. To restore the legacy behavior, set `spark.sql.legacy.skipPartitionSpecTypeValidation` to `true`. ## Upgrading from Spark SQL 3.2 to 3.3 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ebff9ce546d00..0a60c6b0265af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3004,6 +3004,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION = + buildConf("spark.sql.legacy.skipTypeValidationOnAlterPartition") + .internal() + .doc("When true, skip validation for partition spec in ALTER PARTITION. E.g., " + + "`ALTER TABLE .. ADD PARTITION(p='a')` would work even the partition type is int. " + + s"When false, the behavior follows ${STORE_ASSIGNMENT_POLICY.key}") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val SORT_BEFORE_REPARTITION = buildConf("spark.sql.execution.sortBeforeRepartition") .internal() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index 1f5e225324efc..87f140cb3c4a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -20,14 +20,47 @@ package org.apache.spark.sql.util import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal} import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{CharType, StructType, VarcharType} +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy +import org.apache.spark.sql.types.{CharType, DataType, StringType, StructField, StructType, VarcharType} import org.apache.spark.unsafe.types.UTF8String private[sql] object PartitioningUtils { + + def castPartitionSpec(value: String, dt: DataType, conf: SQLConf): Expression = { + conf.storeAssignmentPolicy match { + // SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition + // values but not completely follow because we can't do static type checking due to + // the reason that the parser has erased the type info of static partition values + // and converted them to string. + case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT => + val cast = Cast(Literal(value), dt, Option(conf.sessionLocalTimeZone), + ansiEnabled = true) + cast.setTagValue(Cast.BY_TABLE_INSERTION, ()) + cast + case _ => + Cast(Literal(value), dt, Option(conf.sessionLocalTimeZone), + ansiEnabled = false) + } + } + + private def normalizePartitionStringValue(value: String, field: StructField): String = { + val casted = Cast( + castPartitionSpec(value, field.dataType, SQLConf.get), + StringType, + Option(SQLConf.get.sessionLocalTimeZone) + ).eval() + if (casted != null) { + casted.asInstanceOf[UTF8String].toString + } else { + null + } + } + /** * Normalize the column names in partition specification, w.r.t. the real partition column names * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a @@ -61,6 +94,14 @@ private[sql] object PartitioningUtils { case other => other } v.asInstanceOf[T] + case _ if !SQLConf.get.getConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION) && + value != null && value != DEFAULT_PARTITION_NAME => + val v = value match { + case Some(str: String) => Some(normalizePartitionStringValue(str, normalizedFiled)) + case str: String => normalizePartitionStringValue(str, normalizedFiled) + case other => other + } + v.asInstanceOf[T] case _ => value } normalizedFiled.name -> normalizedVal diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 0216503fba0f4..8b985e82963e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -48,9 +48,9 @@ import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators import org.apache.spark.sql.execution.streaming.StreamingRelation -import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String @@ -106,22 +106,8 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] { None } else if (potentialSpecs.size == 1) { val partValue = potentialSpecs.head._2 - conf.storeAssignmentPolicy match { - // SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition - // values but not completely follow because we can't do static type checking due to - // the reason that the parser has erased the type info of static partition values - // and converted them to string. - case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT => - val cast = Cast(Literal(partValue), field.dataType, Option(conf.sessionLocalTimeZone), - ansiEnabled = true) - cast.setTagValue(Cast.BY_TABLE_INSERTION, ()) - Some(Alias(cast, field.name)()) - case _ => - val castExpression = - Cast(Literal(partValue), field.dataType, Option(conf.sessionLocalTimeZone), - ansiEnabled = false) - Some(Alias(castExpression, field.name)()) - } + Some(Alias(CatalystPartitioningUtils.castPartitionSpec( + partValue, field.dataType, conf), field.name)()) } else { throw QueryCompilationErrors.multiplePartitionColumnValuesSpecifiedError( field, potentialSpecs) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index e113499ec685e..f414de1b87c48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command import java.time.{Duration, Period} +import org.apache.spark.SparkNumberFormatException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -40,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf */ trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils { override val command = "ALTER TABLE .. ADD PARTITION" + def defaultPartitionName: String test("one partition") { withNamespaceAndTable("ns", "tbl") { t => @@ -213,4 +215,46 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils Row(Period.ofYears(1), Duration.ofDays(-1), "bbb"))) } } + + test("SPARK-40798: Alter partition should verify partition value") { + def shouldThrowException(policy: SQLConf.StoreAssignmentPolicy.Value): Boolean = policy match { + case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT => + true + case SQLConf.StoreAssignmentPolicy.LEGACY => + false + } + + SQLConf.StoreAssignmentPolicy.values.foreach { policy => + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)") + + withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) { + if (shouldThrowException(policy)) { + checkError( + exception = intercept[SparkNumberFormatException] { + sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')") + }, + errorClass = "CAST_INVALID_INPUT", + parameters = Map( + "ansiConfig" -> "\"spark.sql.ansi.enabled\"", + "expression" -> "'aaa'", + "sourceType" -> "\"STRING\"", + "targetType" -> "\"INT\""), + context = ExpectedContext( + fragment = s"ALTER TABLE $t ADD PARTITION (p='aaa')", + start = 0, + stop = 35 + t.length)) + } else { + sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')") + checkPartitions(t, Map("p" -> defaultPartitionName)) + sql(s"ALTER TABLE $t DROP PARTITION (p=null)") + } + + sql(s"ALTER TABLE $t ADD PARTITION (p=null)") + checkPartitions(t, Map("p" -> defaultPartitionName)) + sql(s"ALTER TABLE $t DROP PARTITION (p=null)") + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index 11df5ede8bbf4..d41fd6b00f8aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.v1 import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME import org.apache.spark.sql.execution.command import org.apache.spark.sql.internal.SQLConf @@ -33,6 +34,8 @@ import org.apache.spark.sql.internal.SQLConf * `org.apache.spark.sql.hive.execution.command.AlterTableAddPartitionSuite` */ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase { + override def defaultPartitionName: String = DEFAULT_PARTITION_NAME + test("empty string as partition value") { withNamespaceAndTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)") @@ -157,6 +160,18 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) } } + + test("SPARK-40798: Alter partition should verify partition value - legacy") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)") + + withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true") { + sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')") + checkPartitions(t, Map("p" -> "aaa")) + sql(s"ALTER TABLE $t DROP PARTITION (p='aaa')") + } + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index 835be8573fdc4..a9ab11e483fd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.execution.command +import org.apache.spark.sql.internal.SQLConf /** * The class contains tests for the `ALTER TABLE .. ADD PARTITION` command @@ -28,6 +29,8 @@ import org.apache.spark.sql.execution.command class AlterTableAddPartitionSuite extends command.AlterTableAddPartitionSuiteBase with CommandSuiteBase { + override def defaultPartitionName: String = "null" + test("SPARK-33650: add partition into a table which doesn't support partition management") { withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") @@ -121,4 +124,16 @@ class AlterTableAddPartitionSuite checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) } } + + test("SPARK-40798: Alter partition should verify partition value - legacy") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)") + + withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true") { + sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')") + checkPartitions(t, Map("p" -> defaultPartitionName)) + sql(s"ALTER TABLE $t DROP PARTITION (p=null)") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 99aef0e47de9b..ef99b06a46a68 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -608,7 +608,7 @@ class HiveDDLSuite } test("SPARK-19129: drop partition with a empty string will drop the whole table") { - val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name") + val df = spark.createDataFrame(Seq(("0", "a"), ("1", "b"))).toDF("partCol1", "name") df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable") assertAnalysisError( "alter table partitionedTable drop partition(partCol1='')",