Skip to content

Commit

Permalink
Add IT for partial index query rewrite
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 9, 2023
1 parent 87f53b2 commit 194f1ae
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression
import org.apache.spark.sql.functions.{col, expr, input_file_name, sha1}
import org.apache.spark.sql.functions.{col, input_file_name, sha1}

/**
* Flint skipping index in Spark.
Expand Down Expand Up @@ -87,11 +87,7 @@ case class FlintSparkSkippingIndex(

// Add optional filtering condition
if (filterCondition.isDefined) {
if (isConjunction(expr(filterCondition.get).expr)) { // TODO: do the same for covering and add UT/IT
job = job.where(filterCondition.get)
} else {
throw new IllegalStateException("Filtering condition is not conjunction")
}
job = job.where(filterCondition.get)
}

job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

checkAnswer(query, Row("Hello"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023 && col("month") === 4))
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("year") === 2023 && col("month") === 4)
and hasScanMode(isHybridScanExpected = false))
}

test("can build value set skipping index and rewrite applicable query") {
Expand All @@ -286,7 +288,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

checkAnswer(query, Row("World"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(hasIndexFilter(col("address") === "Portland"))
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("address") === "Portland")
and hasScanMode(isHybridScanExpected = false))
}

test("can build min max skipping index and rewrite applicable query") {
Expand All @@ -306,7 +310,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
checkAnswer(query, Row("World"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25))
hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25)
and hasScanMode(isHybridScanExpected = false))
}

test("should rewrite applicable query with table name without database specified") {
Expand All @@ -325,7 +330,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("year") === 2023))
hasIndexFilter(col("year") === 2023)
and hasScanMode(isHybridScanExpected = false))
}

test("should not rewrite original query if filtering condition has disjunction") {
Expand Down Expand Up @@ -379,10 +385,36 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| WHERE month = 4
|""".stripMargin)

query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("month") === 4)
and hasScanMode(isHybridScanExpected = true))

checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver")))
}
}

test("should rewrite applicable query to scan latest source files if partial index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("month")
.filterBy("month > 4")
.create()
flint.refreshIndex(testIndex, FULL)

val query = sql(s"""
| SELECT address
| FROM $testTable
| WHERE month = 4
|""".stripMargin)

query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("month") === 4)
and hasScanMode(isHybridScanExpected = true))
}

test("should return empty if describe index not exist") {
flint.describeIndex("non-exist") shouldBe empty
}
Expand Down Expand Up @@ -670,6 +702,17 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

def hasScanMode(isHybridScanExpected: Boolean): Matcher[FlintSparkSkippingFileIndex] = {
Matcher { (fileIndex: FlintSparkSkippingFileIndex) =>
val hasExpectedScanMode = fileIndex.isHybridScanMode == isHybridScanExpected

MatchResult(
hasExpectedScanMode,
"FlintSparkSkippingFileIndex does not have expected scan mode",
"FlintSparkSkippingFileIndex has expected scan mode")
}
}

private def withFlintOptimizerDisabled(block: => Unit): Unit = {
spark.conf.set(OPTIMIZER_RULE_ENABLED.key, "false")
try {
Expand Down

0 comments on commit 194f1ae

Please sign in to comment.