From 878ef4edaa022fb83c2ed9b009f2510d4055c366 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 20 May 2024 15:24:26 -0700 Subject: [PATCH] push down read-padding on char type Signed-off-by: Peng Huo --- .../skipping/FlintSparkSkippingStrategy.scala | 14 ++++++++++++++ .../spark/FlintSparkSkippingIndexITSuite.scala | 7 ++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala index de2ea772d..fa9b23951 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala @@ -11,7 +11,10 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKi import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GetStructField} +import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke +import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.StringType /** * Skipping index strategy that defines skipping data structure building and reading logic. @@ -115,6 +118,17 @@ object FlintSparkSkippingStrategy { Seq(attr.name) case GetStructField(child, _, Some(name)) => extractColumnName(child) :+ name + /** + * Since Spark 3.4 add read-side padding, char_col = "sample char" became + * (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, + * StringType, readSidePadding, char_col#47, 20, true, false, true) = sample char ) + * + * When create skipping index, Spark did write-side padding. So read-side push down can be + * ignored. More reading, https://issues.apache.org/jira/browse/SPARK-40697 + */ + case StaticInvoke(staticObject, StringType, "readSidePadding", arguments, _, _, _, _) + if classOf[CharVarcharCodegenUtils].isAssignableFrom(staticObject) => + extractColumnName(arguments.head) case _ => Seq.empty } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 5750d85ff..999fb3008 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -797,13 +797,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // CharType column is padded to a fixed length with whitespace val paddedChar = "sample char".padTo(20, ' ') checkAnswer(query, Row("sample varchar", paddedChar)) - /* - * todo Spark 3.4 add read-side padding, SkippingIndex rule can not push down char_col plan now. - * https://issues.apache.org/jira/browse/SPARK-40697 - */ query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex( - hasIndexFilter(isnull(col("varchar_col")) || col("varchar_col") === "sample varchar")) + hasIndexFilter((isnull(col("varchar_col")) || col("varchar_col") === "sample varchar") && + (isnull(col("char_col")) || col("char_col") === paddedChar))) deleteTestIndex(testIndex) }