Skip to content

Commit

Permalink
Add IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jul 3, 2024
1 parent b5c958a commit 928948e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait FlintSparkQueryRewriteHelper {
val queryConditions = flattenConditions(queryFilter)

// Ensures that every condition in the index filter is subsumed by at least one condition
// in the query filter
// on the same column in the query filter
indexConditions.forall { indexCondition =>
queryConditions.exists { queryCondition =>
(indexCondition, queryCondition) match {
Expand All @@ -79,9 +79,7 @@ trait FlintSparkQueryRewriteHelper {
case class Range(lower: Option[Bound], upper: Option[Bound]) {

/**
* Determines if this range subsumes (completely covers) another range. A range is considered
* to subsume another if its lower bound is less restrictive and its upper bound is more
* restrictive than those of the other range.
* Determines if this range subsumes (completely covers) another range.
*
* @param other
* The other range to compare against.
Expand All @@ -108,7 +106,7 @@ trait FlintSparkQueryRewriteHelper {

/**
* Constructs a Range object from a binary comparison expression, translating comparison
* operators into bounds with appropriate inclusivity.
* operators into bounds with appropriate inclusiveness.
*
* @param condition
* The binary comparison
Expand All @@ -124,7 +122,7 @@ trait FlintSparkQueryRewriteHelper {
Range(None, Some(Bound(value, inclusive = true)))
case EqualTo(_, value: Literal) =>
Range(Some(Bound(value, inclusive = true)), Some(Bound(value, inclusive = true)))
case _ => Range(None, None) // For unsupported or complex conditions
case _ => Range(None, None) // Infinity for unsupported or complex conditions
}
}

Expand All @@ -145,7 +143,8 @@ trait FlintSparkQueryRewriteHelper {
* @param other
* The bound to compare against.
* @return
* True if this bound is less than or equal to the other bound.
* True if this bound is less than or equal to the other bound, either because value is
* smaller, this bound is inclusive, or both bound are exclusive.
*/
def lessThanOrEqualTo(other: Bound): Boolean = {
val cmp = value.value.asInstanceOf[Comparable[Any]].compareTo(other.value.value)
Expand All @@ -158,7 +157,8 @@ trait FlintSparkQueryRewriteHelper {
* @param other
* The bound to compare against.
* @return
* True if this bound is greater than or equal to the other bound.
* True if this bound is greater than or equal to the other bound, either because value is
* greater, this bound is inclusive, or both bound are exclusive.
*/
def greaterThanOrEqualTo(other: Bound): Boolean = {
val cmp = value.value.asInstanceOf[Comparable[Any]].compareTo(other.value.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSo

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parseExpression
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand Down Expand Up @@ -119,31 +119,29 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark)
queryFilter: Option[Expression],
relationCols: Set[String]): Boolean = {
val indexedCols = index.indexedColumns.keySet
val isSubsumed = subsume(queryFilter, index.filterCondition)
val subsumption = (index.filterCondition, queryFilter) match {
case (None, _) => true // full index can cover any query filter
case (Some(_), None) => false // partial index cannot cover query without filter
case (Some(indexFilter), Some(_)) =>
subsume(parseExpression(indexFilter), queryFilter.get)
}
val isApplicable =
index.latestLogEntry.exists(_.state != DELETED) &&
isSubsumed &&
subsumption &&
relationCols.subsetOf(indexedCols)

logInfo(s"""
| Is covering index ${index.name()} applicable: $isApplicable
| Index state: ${index.latestLogEntry.map(_.state)}
| Index filter subsumption: $isSubsumed
| Query filter: $queryFilter
| Index filter: ${index.filterCondition}
| Subsumption test: $subsumption
| Columns required: $relationCols
| Columns indexed: $indexedCols
|""".stripMargin)
isApplicable
}

private def subsume(queryFilter: Option[Expression], indexFilter: Option[String]): Boolean = {
(queryFilter, indexFilter) match {
case (_, None) => true // full indexing
case (None, Some(_)) => false
case (Some(_), Some(_)) =>
subsume(CatalystSqlParser.parseExpression(indexFilter.get), queryFilter.get)
}
}

private def replaceTableRelationWithIndexRelation(
index: FlintSparkCoveringIndex,
relation: FlintSparkSourceRelation): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,19 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
checkAnswer(sql(query), Seq(Row("Hello", 30), Row("World", 25)))
}

test("rewrite applicable simple query with partial covering index") {
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WHERE age > 25
| WITH (auto_refresh = true)
| """.stripMargin)

val query = s"SELECT name, age FROM $testTable WHERE age >= 30"
checkKeywordsExist(sql(s"EXPLAIN $query"), "FlintScan")
checkAnswer(sql(query), Seq(Row("Hello", 30)))
}

test("rewrite applicable aggregate query with covering index") {
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
Expand Down Expand Up @@ -290,6 +303,19 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
}
}

test("should not rewrite with partial covering index if not applicable") {
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WHERE age > 25
| WITH (auto_refresh = true)
| """.stripMargin)

val query = s"SELECT name, age FROM $testTable WHERE age > 20"
checkKeywordsNotExist(sql(s"EXPLAIN $query"), "FlintScan")
checkAnswer(sql(query), Seq(Row("Hello", 30), Row("World", 25)))
}

test("rewrite applicable query with covering index before skipping index") {
try {
sql(s"""
Expand Down

0 comments on commit 928948e

Please sign in to comment.