Skip to content

Commit

Permalink
[SPARK-40798][SQL] Alter partition should verify value follow storeAs…
Browse files Browse the repository at this point in the history
…signmentPolicy

### What changes were proposed in this pull request?

extract the check insertion field cast methold so that we can do validate patition value at PartitioningUtils.normalizePartitionSpec

### Why are the changes needed?

Insertion follow the behavior of config `spark.sql.storeAssignmentPolicy`, which will fail if the value can not cast to target data type by default. Alter partition should also follow it. For example:
```SQL
CREATE TABLE t (c int) USING PARQUET PARTITIONED BY(p int);

-- This DDL should fail but worked:
ALTER TABLE t ADD PARTITION(p='aaa');

-- FAILED which follows spark.sql.storeAssignmentPolicy
INSERT INTO t PARTITION(p='aaa') SELECT 1
```

### Does this PR introduce _any_ user-facing change?

yes,  the added partition value will follow the behavior of  `storeAssignmentPolicy`.

To restore the previous behavior, set spark.sql.legacy.skipPartitionSpecTypeValidation = true;

### How was this patch tested?

add test

Closes apache#38257 from ulysses-you/verify-partition.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
ulysses-you authored and cloud-fan committed Oct 24, 2022
1 parent 825f219 commit 9140795
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 19 deletions.
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ license: |
- Valid hexadecimal strings should include only allowed symbols (0-9A-Fa-f).
- Valid values for `fmt` are case-insensitive `hex`, `base64`, `utf-8`, `utf8`.
- Since Spark 3.4, Spark throws only `PartitionsAlreadyExistException` when it creates partitions but some of them exist already. In Spark 3.3 or earlier, Spark can throw either `PartitionsAlreadyExistException` or `PartitionAlreadyExistsException`.
- Since Spark 3.4, Spark will do validation for partition spec in ALTER PARTITION to follow the behavior of `spark.sql.storeAssignmentPolicy` which may cause an exception if type conversion fails, e.g. `ALTER TABLE .. ADD PARTITION(p='a')` if column `p` is int type. To restore the legacy behavior, set `spark.sql.legacy.skipPartitionSpecTypeValidation` to `true`.

## Upgrading from Spark SQL 3.2 to 3.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3004,6 +3004,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION =
buildConf("spark.sql.legacy.skipTypeValidationOnAlterPartition")
.internal()
.doc("When true, skip validation for partition spec in ALTER PARTITION. E.g., " +
"`ALTER TABLE .. ADD PARTITION(p='a')` would work even the partition type is int. " +
s"When false, the behavior follows ${STORE_ASSIGNMENT_POLICY.key}")
.version("3.4.0")
.booleanConf
.createWithDefault(false)

val SORT_BEFORE_REPARTITION =
buildConf("spark.sql.execution.sortBeforeRepartition")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,47 @@ package org.apache.spark.sql.util
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{CharType, StructType, VarcharType}
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.{CharType, DataType, StringType, StructField, StructType, VarcharType}
import org.apache.spark.unsafe.types.UTF8String

private[sql] object PartitioningUtils {

def castPartitionSpec(value: String, dt: DataType, conf: SQLConf): Expression = {
conf.storeAssignmentPolicy match {
// SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
// values but not completely follow because we can't do static type checking due to
// the reason that the parser has erased the type info of static partition values
// and converted them to string.
case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
val cast = Cast(Literal(value), dt, Option(conf.sessionLocalTimeZone),
ansiEnabled = true)
cast.setTagValue(Cast.BY_TABLE_INSERTION, ())
cast
case _ =>
Cast(Literal(value), dt, Option(conf.sessionLocalTimeZone),
ansiEnabled = false)
}
}

private def normalizePartitionStringValue(value: String, field: StructField): String = {
val casted = Cast(
castPartitionSpec(value, field.dataType, SQLConf.get),
StringType,
Option(SQLConf.get.sessionLocalTimeZone)
).eval()
if (casted != null) {
casted.asInstanceOf[UTF8String].toString
} else {
null
}
}

/**
* Normalize the column names in partition specification, w.r.t. the real partition column names
* and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a
Expand Down Expand Up @@ -61,6 +94,14 @@ private[sql] object PartitioningUtils {
case other => other
}
v.asInstanceOf[T]
case _ if !SQLConf.get.getConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION) &&
value != null && value != DEFAULT_PARTITION_NAME =>
val v = value match {
case Some(str: String) => Some(normalizePartitionStringValue(str, normalizedFiled))
case str: String => normalizePartitionStringValue(str, normalizedFiled)
case other => other
}
v.asInstanceOf[T]
case _ => value
}
normalizedFiled.name -> normalizedVal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -106,22 +106,8 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
None
} else if (potentialSpecs.size == 1) {
val partValue = potentialSpecs.head._2
conf.storeAssignmentPolicy match {
// SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
// values but not completely follow because we can't do static type checking due to
// the reason that the parser has erased the type info of static partition values
// and converted them to string.
case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
val cast = Cast(Literal(partValue), field.dataType, Option(conf.sessionLocalTimeZone),
ansiEnabled = true)
cast.setTagValue(Cast.BY_TABLE_INSERTION, ())
Some(Alias(cast, field.name)())
case _ =>
val castExpression =
Cast(Literal(partValue), field.dataType, Option(conf.sessionLocalTimeZone),
ansiEnabled = false)
Some(Alias(castExpression, field.name)())
}
Some(Alias(CatalystPartitioningUtils.castPartitionSpec(
partValue, field.dataType, conf), field.name)())
} else {
throw QueryCompilationErrors.multiplePartitionColumnValuesSpecifiedError(
field, potentialSpecs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command

import java.time.{Duration, Period}

import org.apache.spark.SparkNumberFormatException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand All @@ -40,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf
*/
trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "ALTER TABLE .. ADD PARTITION"
def defaultPartitionName: String

test("one partition") {
withNamespaceAndTable("ns", "tbl") { t =>
Expand Down Expand Up @@ -213,4 +215,46 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils
Row(Period.ofYears(1), Duration.ofDays(-1), "bbb")))
}
}

test("SPARK-40798: Alter partition should verify partition value") {
def shouldThrowException(policy: SQLConf.StoreAssignmentPolicy.Value): Boolean = policy match {
case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT =>
true
case SQLConf.StoreAssignmentPolicy.LEGACY =>
false
}

SQLConf.StoreAssignmentPolicy.values.foreach { policy =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)")

withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) {
if (shouldThrowException(policy)) {
checkError(
exception = intercept[SparkNumberFormatException] {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
},
errorClass = "CAST_INVALID_INPUT",
parameters = Map(
"ansiConfig" -> "\"spark.sql.ansi.enabled\"",
"expression" -> "'aaa'",
"sourceType" -> "\"STRING\"",
"targetType" -> "\"INT\""),
context = ExpectedContext(
fragment = s"ALTER TABLE $t ADD PARTITION (p='aaa')",
start = 0,
stop = 35 + t.length))
} else {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
checkPartitions(t, Map("p" -> defaultPartitionName))
sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
}

sql(s"ALTER TABLE $t ADD PARTITION (p=null)")
checkPartitions(t, Map("p" -> defaultPartitionName))
sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.v1

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -33,6 +34,8 @@ import org.apache.spark.sql.internal.SQLConf
* `org.apache.spark.sql.hive.execution.command.AlterTableAddPartitionSuite`
*/
trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase {
override def defaultPartitionName: String = DEFAULT_PARTITION_NAME

test("empty string as partition value") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
Expand Down Expand Up @@ -157,6 +160,18 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}

test("SPARK-40798: Alter partition should verify partition value - legacy") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)")

withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true") {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
checkPartitions(t, Map("p" -> "aaa"))
sql(s"ALTER TABLE $t DROP PARTITION (p='aaa')")
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.v2
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.internal.SQLConf

/**
* The class contains tests for the `ALTER TABLE .. ADD PARTITION` command
Expand All @@ -28,6 +29,8 @@ import org.apache.spark.sql.execution.command
class AlterTableAddPartitionSuite
extends command.AlterTableAddPartitionSuiteBase
with CommandSuiteBase {
override def defaultPartitionName: String = "null"

test("SPARK-33650: add partition into a table which doesn't support partition management") {
withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
Expand Down Expand Up @@ -121,4 +124,16 @@ class AlterTableAddPartitionSuite
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
}
}

test("SPARK-40798: Alter partition should verify partition value - legacy") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)")

withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true") {
sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')")
checkPartitions(t, Map("p" -> defaultPartitionName))
sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ class HiveDDLSuite
}

test("SPARK-19129: drop partition with a empty string will drop the whole table") {
val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
val df = spark.createDataFrame(Seq(("0", "a"), ("1", "b"))).toDF("partCol1", "name")
df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
assertAnalysisError(
"alter table partitionedTable drop partition(partCol1='')",
Expand Down

0 comments on commit 9140795

Please sign in to comment.