Skip to content

Commit

Permalink
Fix skipping index IT for char column and re-enable Iceberg IT (#349)
Browse files Browse the repository at this point in the history
* enable Iceberg IT

Signed-off-by: Peng Huo <[email protected]>

* push down read-padding on char type

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored May 21, 2024
1 parent 624b488 commit b5eb552
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKi

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GetStructField}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StringType

/**
* Skipping index strategy that defines skipping data structure building and reading logic.
Expand Down Expand Up @@ -115,6 +118,17 @@ object FlintSparkSkippingStrategy {
Seq(attr.name)
case GetStructField(child, _, Some(name)) =>
extractColumnName(child) :+ name
/**
* Since Spark 3.4 add read-side padding, char_col = "sample char" became
* (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
* StringType, readSidePadding, char_col#47, 20, true, false, true) = sample char )
*
* When create skipping index, Spark did write-side padding. So read-side push down can be
* ignored. More reading, https://issues.apache.org/jira/browse/SPARK-40697
*/
case StaticInvoke(staticObject, StringType, "readSidePadding", arguments, _, _, _, _)
if classOf[CharVarcharCodegenUtils].isAssignableFrom(staticObject) =>
extractColumnName(arguments.head)
case _ => Seq.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,13 +797,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
// CharType column is padded to a fixed length with whitespace
val paddedChar = "sample char".padTo(20, ' ')
checkAnswer(query, Row("sample varchar", paddedChar))
/*
* todo Spark 3.4 add read-side padding, SkippingIndex rule can not push down char_col plan now.
* https://issues.apache.org/jira/browse/SPARK-40697
*/
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(isnull(col("varchar_col")) || col("varchar_col") === "sample varchar"))
hasIndexFilter((isnull(col("varchar_col")) || col("varchar_col") === "sample varchar") &&
(isnull(col("char_col")) || col("char_col") === paddedChar)))

deleteTestIndex(testIndex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite

// FIXME: https://github.com/opensearch-project/opensearch-spark/issues/331#issuecomment-2110948494
/*
class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {}
*/
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

package org.opensearch.flint.spark.iceberg

import org.junit.Ignore
import org.opensearch.flint.spark.FlintSparkSkippingIndexSqlITSuite

// FIXME: https://github.com/opensearch-project/opensearch-spark/issues/331#issuecomment-2110948494
/*
class FlintSparkIcebergSkippingIndexITSuite
extends FlintSparkSkippingIndexSqlITSuite
with FlintSparkIcebergSuite {}
*/

0 comments on commit b5eb552

Please sign in to comment.