Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49679][SQL] validateSchemaOutput should refer to case sensitivity flag #48127

Closed
wants to merge 6 commits into from

Conversation

averyqi-db
Copy link
Contributor

What changes were proposed in this pull request?

validateSchemaOutput check case sensitivity flag and pick comparison methods according to the result.

Why are the changes needed?

If we're using spark.sql.caseSensitive set to false, we should accept queries like this:

          |SELECT * FROM (
          |    Select a.ppmonth,
          |    a.ppweek,
          |    case when a.retsubcategoryderived <= 1 then 'XXXXXXXXXXXXX'
          |    else
          |    'XXXXXX'
          |    end as mappedflag,
          |    b.name as subcategory_name,
          |    sum(a.totalvalue) as RDOLLARS
          |    from a, b
          |    where a.retsubcategoryderived = b.retsubcategoryderived
          |    group by a.Ppmonth,a.ppweek,a.retsubcategoryderived,b.name, mappedflag)

However, validateSchemaOutput in optimizer's checks about plan schema changes does not use this flag, which leads to a situation that some queries will fail this check even if the optimization is correct. Take this query as an example:
After AggregatePushdownThroughJoins, the schema Ppmonth does not match with schema ppmonth in validateSchemaOutput and it then fails this check.

We need to use this flag in validateSchemaOutput.

Does this PR introduce any user-facing change?

yes, the above query is accepted and run successfully.

How was this patch tested?

WIP

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Sep 16, 2024
@averyqi-db
Copy link
Contributor Author

cc: @xil-db for review. And do you have any suggestions on where to put the testcases for this patch? Thx for any help~

@HyukjinKwon HyukjinKwon changed the title [SPARK-49679][SQL]validateSchemaOutput should refer to case sensitivity flag [SPARK-49679][SQL] validateSchemaOutput should refer to case sensitivity flag Sep 17, 2024
Copy link
Contributor

@xil-db xil-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add a test to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala

@averyqi-db averyqi-db requested a review from xil-db September 17, 2024 18:52
Copy link
Contributor

@xil-db xil-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, cc @cloud-fan

@averyqi-db
Copy link
Contributor Author

cc: @cloud-fan for merge, thx~

@cloud-fan
Copy link
Contributor

I don't quite agree with this. The underlying catalog/data source may be case-sensitive and It's important to keep the query schema(column names) unchanged

@xil-db
Copy link
Contributor

xil-db commented Sep 18, 2024

I don't quite agree with this. The underlying catalog/data source may be case-sensitive and It's important to keep the query schema(column names) unchanged

Yeah, I think that's a fair point. Don't have the full context here, I assume the intention of this PR is to avoid certain "false-alarms" raised by this validation, but if AggregatePushdownThroughJoins changes the column from ppmonth to Ppmonth with spark.sql.caseSensitive set to true, then we'd have a true alarm.

Maybe I'm missing something, maybe it'd be better to just fix AggregatePushdownThroughJoins to avoid changing the column names regardless of the setting of spark.sql.caseSensitive. cc @averyqi-db

@averyqi-db
Copy link
Contributor Author

I don't quite agree with this. The underlying catalog/data source may be case-sensitive and It's important to keep the query schema(column names) unchanged

Yeah, I think that's a fair point. Don't have the full context here, I assume the intention of this PR is to avoid certain "false-alarms" raised by this validation, but if AggregatePushdownThroughJoins changes the column from ppmonth to Ppmonth with spark.sql.caseSensitive set to true, then we'd have a true alarm.

Maybe I'm missing something, maybe it'd be better to just fix AggregatePushdownThroughJoins to avoid changing the column names regardless of the setting of spark.sql.caseSensitive. cc @averyqi-db

Sorry I should state more clearly about the context.

So here's the context, users have this query running under spark.sql.caseSensitive = false configuration.

SELECT * FROM (
          |    Select a.ppmonth,
          |    a.ppweek,
          |    case when a.retsubcategoryderived <= 1 then 'XXXXXXXXXXXXX'
          |    else
          |    'XXXXXX'
          |    end as mappedflag,
          |    b.name as subcategory_name,
          |    sum(a.totalvalue) as RDOLLARS
          |    from a, b
          |    where a.retsubcategoryderived = b.retsubcategoryderived
          |    group by a.Ppmonth,a.ppweek,a.retsubcategoryderived,b.name, mappedflag)

Though they have stated ppmonth in two different ways, ppmonth and Ppmonth, as they are running under case insensitive mode, they expect this query to run successfully.

However, they got error message saying that: org.apache.spark.SparkException: [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule com.databricks.sql.optimizer.AggregatePushdownThroughJoins in batch AggregatePushdownThroughJoins generated an invalid plan: The plan output schema has changed

And this error message is because AggregatePushDownThroughJoins are trying to push down aggregation through join operator and change the plan from:

Aggregate [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, name#13, _groupingexpression#29], [ppmonth#3L, ppweek#4L, _groupingexpression#29 AS mappedflag#0, name#13 AS subcategory_name#1, sum(totalvalue#9L) AS RDOLLARS#2L]
+- Project [ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, totalvalue#9L, name#13, CASE WHEN (retsubcategoryderived#7L <= 1) THEN XXXXXXXXXXXXX ELSE XXXXXX END AS _groupingexpression#29]
   +- Join Inner, (retsubcategoryderived#7L = retsubcategoryderived#10L)
      :- Project [ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, totalvalue#9L]
      :  +- Filter isnotnull(retsubcategoryderived#7L)
      :     +- Relation spark_catalog.default.a[ppmonth#3L,ppweek#4L,retcategorygroupderived#5L,rethidsubcategoryderived#6L,retsubcategoryderived#7L,retsupercategoryderived#8L,totalvalue#9L] parquet
      +- Project [retsubcategoryderived#10L, name#13]
         +- Filter isnotnull(retsubcategoryderived#10L)
            +- Relation spark_catalog.default.b[retsubcategoryderived#10L,description#11,displayorder#12L,name#13,shortname#14,startrange#15,endrange#16,retcategoryderived#17L,retcategorygroupderived#18L,retsupercategoryderived#19L,altbusiness#20L] parquet

To

Project [Ppmonth#3L, ppweek#4L, _groupingexpression#29 AS mappedflag#0, name#13 AS subcategory_name#1, sum(totalvalue#9L)#23L AS RDOLLARS#2L]
+- AggregatePart [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, name#13, _groupingexpression#29], [finalmerge_sum(merge sum#31L) AS sum(totalvalue#9L)#23L], true
   +- AggregatePart [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, name#13, _groupingexpression#29], [merge_sum(merge sum#31L) AS sum#31L], false
      +- Project [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, name#13, _groupingexpression#29, sum#31L]
         +- Join Inner, (retsubcategoryderived#7L = retsubcategoryderived#10L)
            :- AggregatePart [Ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, CASE WHEN (retsubcategoryderived#7L <= 1) THEN XXXXXXXXXXXXX ELSE XXXXXX END AS _groupingexpression#29], [partial_sum(totalvalue#9L) AS sum#31L], false
            :  +- Project [ppmonth#3L, ppweek#4L, retsubcategoryderived#7L, totalvalue#9L]
            :     +- Filter isnotnull(retsubcategoryderived#7L)
            :        +- Relation spark_catalog.default.a[ppmonth#3L,ppweek#4L,retcategorygroupderived#5L,rethidsubcategoryderived#6L,retsubcategoryderived#7L,retsupercategoryderived#8L,totalvalue#9L] parquet
            +- Project [retsubcategoryderived#10L, name#13]
               +- Filter isnotnull(retsubcategoryderived#10L)
                  +- Relation spark_catalog.default.b[retsubcategoryderived#10L,description#11,displayorder#12L,name#13,shortname#14,startrange#15,endrange#16,retcategoryderived#17L,retcategorygroupderived#18L,retsupercategoryderived#19L,altbusiness#20L] parquet

And there's nothing wrong with AggregatePushdownThroughJoins cause all optimization is based on the assumption that the child output schema should equal to the input schema of the parent operator. And it is hard to distinguish between where Ppmonth or ppmonth appear in the optimization rule.

The error appear because we're checking the schema in case sensitive mode even the caseSensitivity flag is set to false. And DataType actually provide caseSensitiveCheck and caseInsensitiveCheck, we just need to use corresponding checking method according to the config in validateSchemaOutput.

I understand there might be risk introducing rules which actually change the case of schema, but as it can only pass the check under case insensitive mode, it should not raise much concerns as under case insensitive mode, it is acceptable IIUC.

Please correct me if my understanding is wrong. For example, there might be high risks introduced by this new schema check.

@cloud-fan
Copy link
Contributor

no matter Spark is case-sensitive or not, it must be case-preserving. We need to fix the optimizer rule that breaks case-preserving.

@averyqi-db
Copy link
Contributor Author

no matter Spark is case-sensitive or not, it must be case-preserving. We need to fix the optimizer rule that breaks case-preserving.

I see. But for this case, AggregatePushThroughJoins are case-preserving, the generated Project operator and AggregatePart operator are generated using existing Aggregate schema. For this case, the validateSchemaOutput does incur a false alarm. I can try to think of another way to erase this false alarm. :)

@cloud-fan
Copy link
Contributor

Is it really false alarm? Seems the rule changed ppmonth to Ppmonth

@averyqi-db
Copy link
Contributor Author

icic. I can rewrite AggregatePushThroughJoins to separate the usage of group by expressions and projected lists making generated Projection using projected expression and push down aggregatePart using group by expressions.

@averyqi-db
Copy link
Contributor Author

@cloud-fan it turns out Analyzer are using different validatePlanChanges implementation only call validateExprIdUniqueness. And this validateSchemaOutput is only used by Optimizer. I think this pr align with our discussion. We'll slack the constraints for optimizer and maintain Analyzer's check. In this case, can we merge the pr?

@averyqi-db
Copy link
Contributor Author

closed as we wanna keep the alarm.

@averyqi-db averyqi-db closed this Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants