diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 0956f9273..0a8e98fc5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -19,7 +19,7 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression -import org.apache.spark.sql.functions.{col, expr, input_file_name, sha1} +import org.apache.spark.sql.functions.{col, input_file_name, sha1} /** * Flint skipping index in Spark. @@ -87,11 +87,7 @@ case class FlintSparkSkippingIndex( // Add optional filtering condition if (filterCondition.isDefined) { - if (isConjunction(expr(filterCondition.get).expr)) { // TODO: do the same for covering and add UT/IT - job = job.where(filterCondition.get) - } else { - throw new IllegalStateException("Filtering condition is not conjunction") - } + job = job.where(filterCondition.get) } job diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 136a76503..90e7caecb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -267,7 +267,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { checkAnswer(query, Row("Hello")) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023 && col("month") === 4)) + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("year") === 2023 && col("month") === 4) + and hasScanMode(isHybridScanExpected = false)) } test("can build value set skipping index and rewrite applicable query") { @@ -286,7 +288,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { checkAnswer(query, Row("World")) query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex(hasIndexFilter(col("address") === "Portland")) + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("address") === "Portland") + and hasScanMode(isHybridScanExpected = false)) } test("can build min max skipping index and rewrite applicable query") { @@ -306,7 +310,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { checkAnswer(query, Row("World")) query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex( - hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25)) + hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25) + and hasScanMode(isHybridScanExpected = false)) } test("should rewrite applicable query with table name without database specified") { @@ -325,7 +330,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex( - hasIndexFilter(col("year") === 2023)) + hasIndexFilter(col("year") === 2023) + and hasScanMode(isHybridScanExpected = false)) } test("should not rewrite original query if filtering condition has disjunction") { @@ -379,10 +385,36 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | WHERE month = 4 |""".stripMargin) + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("month") === 4) + and hasScanMode(isHybridScanExpected = true)) + checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver"))) } } + test("should rewrite applicable query to scan latest source files if partial index") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("month") + .filterBy("month > 4") + .create() + flint.refreshIndex(testIndex, FULL) + + val query = sql(s""" + | SELECT address + | FROM $testTable + | WHERE month = 4 + |""".stripMargin) + + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex( + hasIndexFilter(col("month") === 4) + and hasScanMode(isHybridScanExpected = true)) + } + test("should return empty if describe index not exist") { flint.describeIndex("non-exist") shouldBe empty } @@ -670,6 +702,17 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + def hasScanMode(isHybridScanExpected: Boolean): Matcher[FlintSparkSkippingFileIndex] = { + Matcher { (fileIndex: FlintSparkSkippingFileIndex) => + val hasExpectedScanMode = fileIndex.isHybridScanMode == isHybridScanExpected + + MatchResult( + hasExpectedScanMode, + "FlintSparkSkippingFileIndex does not have expected scan mode", + "FlintSparkSkippingFileIndex has expected scan mode") + } + } + private def withFlintOptimizerDisabled(block: => Unit): Unit = { spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false") try {