diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 5563199205448..425e0d55455f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.plans.logical import scala.collection.mutable -import org.apache.spark.SparkUnsupportedOperationException +import com.google.common.base.Throwables + +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ @@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOGICAL_QUERY_STAGE, Tre import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.MetadataColumnHelper import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types.{DataType, StructType} @@ -297,7 +300,7 @@ trait OrderPreservingUnaryNode extends UnaryNode override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering } -object LogicalPlanIntegrity { +object LogicalPlanIntegrity extends Logging { def canGetOutputAttrs(p: LogicalPlan): Boolean = { p.resolved && !p.expressions.exists { e => @@ -416,12 +419,26 @@ object LogicalPlanIntegrity { } def validateSchemaOutput(previousPlan: LogicalPlan, currentPlan: LogicalPlan): Option[String] = { - if (!DataTypeUtils.equalsIgnoreNullability(previousPlan.schema, currentPlan.schema)) { - Some(s"The plan output schema has changed from ${previousPlan.schema.sql} to " + - currentPlan.schema.sql + s". The previous plan: ${previousPlan.treeString}\nThe new " + - "plan:\n" + currentPlan.treeString) + val msg = "The plan output schema has changed from ${previousPlan.schema.sql} to " + + currentPlan.schema.sql + s". The previous plan: ${previousPlan.treeString}\nThe new " + + "plan:\n" + currentPlan.treeString + val isValid = if (SqlApiConf.get.caseSensitiveAnalysis) { + DataTypeUtils.equalsIgnoreNullability(previousPlan.schema, currentPlan.schema) } else { + if (!DataTypeUtils.equalsIgnoreNullability(previousPlan.schema, currentPlan.schema)) { + logInfo(msg) + val CaseNotPreservingProblem = new SparkException("CaseNotPreservingRule") + val stackTraceString = Option(CaseNotPreservingProblem) + .map(Throwables.getStackTraceAsString).getOrElse("") + logInfo(stackTraceString) + } + DataTypeUtils.equalsIgnoreCaseAndNullability(previousPlan.schema, currentPlan.schema) + } + + if (isValid) { None + } else { + Some(msg) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala index 48cdbbe7be539..2499ac2722931 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala @@ -313,4 +313,41 @@ class OptimizerSuite extends PlanTest { assert(message1.contains("not a valid aggregate expression")) } } + + test("validate schema output respects spark.sql.caseSensitive flag") { + /** + * A dummy optimizer rule for capitalizing alias. + */ + object CapitalizeAttribute extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { + case a: Alias => + // scalastyle:off caselocale + Alias(a.child, a.name.toUpperCase)(a.exprId, a.qualifier, a.explicitMetadata) + // scalastyle:on caselocale + } + } + + val optimizer = new SimpleTestOptimizer() { + override def defaultBatches: Seq[Batch] = + Batch("test", FixedPoint(1), CapitalizeAttribute) :: Nil + } + + val query = Project(Alias(Literal(1), "attr")() :: Nil, OneRowRelation()).analyze + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + try { + optimizer.execute(query) + } catch { + case sx: SparkException => + fail("Query should run successfully with case insensitive setting.") + } + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val message = intercept[SparkException] { + optimizer.execute(query) + }.getMessage + assert(message.contains("The plan output schema has changed")) + } + } }