diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala index 48cdbbe7be539..2499ac2722931 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala @@ -313,4 +313,41 @@ class OptimizerSuite extends PlanTest { assert(message1.contains("not a valid aggregate expression")) } } + + test("validate schema output respects spark.sql.caseSensitive flag") { + /** + * A dummy optimizer rule for capitalizing alias. + */ + object CapitalizeAttribute extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { + case a: Alias => + // scalastyle:off caselocale + Alias(a.child, a.name.toUpperCase)(a.exprId, a.qualifier, a.explicitMetadata) + // scalastyle:on caselocale + } + } + + val optimizer = new SimpleTestOptimizer() { + override def defaultBatches: Seq[Batch] = + Batch("test", FixedPoint(1), CapitalizeAttribute) :: Nil + } + + val query = Project(Alias(Literal(1), "attr")() :: Nil, OneRowRelation()).analyze + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + try { + optimizer.execute(query) + } catch { + case sx: SparkException => + fail("Query should run successfully with case insensitive setting.") + } + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val message = intercept[SparkException] { + optimizer.execute(query) + }.getMessage + assert(message.contains("The plan output schema has changed")) + } + } }