Skip to content

Commit

Permalink
Refactor rewrite rule to support partial indexing
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jul 2, 2024
1 parent 9fad78e commit 06fc426
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, BinaryComparison, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal}

/**
* Query rewrite helper that provides common utilities for query rewrite rule of various Flint
* indexes.
*/
trait FlintSparkQueryRewriteHelper {

/**
* Determines if the conditions in an index filter can subsume those in a query filter. This is
* essential to verify if all outputs that satisfy the index filter also satisfy the query
* filter, indicating that the index can potentially optimize the query.
*
* @param indexFilter
* The filter expression defined from the index
* @param queryFilter
* The filter expression present in the user query
* @return
* True if the index filter can subsume the query filter, otherwise False
*/
def subsume(indexFilter: Expression, queryFilter: Expression): Boolean = {
def flattenConditions(filter: Expression): Seq[Expression] = filter match {
case And(left, right) => flattenConditions(left) ++ flattenConditions(right)
case other => Seq(other)
}

val indexConditions = flattenConditions(indexFilter)
val queryConditions = flattenConditions(queryFilter)

// Each index condition must subsume in a query condition
// otherwise it means index data cannot "cover" query condition
indexConditions.forall { indexCondition =>
queryConditions.exists { queryCondition =>
(indexCondition, queryCondition) match {
case (
i @ BinaryComparison(indexCol: Attribute, _),
q @ BinaryComparison(queryCol: Attribute, _)) if indexCol.name == queryCol.name =>
Range(i).subsume(Range(q))
case _ => false
}
}
}
}

case class Bound(value: Literal, inclusive: Boolean) {

def lessThanOrEqualTo(other: Bound): Boolean = {
val cmp = value.value.asInstanceOf[Comparable[Any]].compareTo(other.value.value)
cmp < 0 || (cmp == 0 && inclusive && other.inclusive)
}
}

case class Range(lower: Option[Bound], upper: Option[Bound]) {

def subsume(other: Range): Boolean = {
val isLowerSubsumed = (lower, other.lower) match {
case (Some(a), Some(b)) => a.lessThanOrEqualTo(b)
case (None, _) => true // `bound1` is unbounded and thus can subsume anything
case (_, None) => false // `bound2` is unbounded and thus cannot be subsumed
case (None, None) => true
}
val isUpperSubsumed = (upper, other.upper) match {
case (Some(a), Some(b)) => b.lessThanOrEqualTo(a)
case (None, _) => true // `bound1` is unbounded and thus can subsume anything
case (_, None) => false // `bound2` is unbounded and thus cannot be subsumed
case (None, None) => true
}
isLowerSubsumed && isUpperSubsumed
}
}

object Range {
def apply(condition: BinaryComparison): Range = condition match {
case GreaterThan(_, value: Literal) => Range(Some(Bound(value, inclusive = false)), None)
case GreaterThanOrEqual(_, value: Literal) =>
Range(Some(Bound(value, inclusive = true)), None)
case LessThan(_, value: Literal) => Range(None, Some(Bound(value, inclusive = false)))
case LessThanOrEqual(_, value: Literal) => Range(None, Some(Bound(value, inclusive = true)))
case EqualTo(_, value: Literal) =>
Range(Some(Bound(value, inclusive = true)), Some(Bound(value, inclusive = true)))
case _ => Range(None, None) // For unsupported or complex conditions
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ package org.opensearch.flint.spark.covering
import java.util

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.{FlintSpark, FlintSparkQueryRewriteHelper}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider}

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.{qualifyTableName, FlintDataSourceV2}
Expand All @@ -27,7 +28,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* @param flint
* Flint Spark API
*/
class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] {
class ApplyFlintSparkCoveringIndex(flint: FlintSpark)
extends Rule[LogicalPlan]
with FlintSparkQueryRewriteHelper {

/** All supported source relation providers */
private val supportedProviders = FlintSparkSourceRelationProvider.getAllProviders(flint.spark)
Expand All @@ -37,22 +40,37 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
plan
} else {
// Iterate each sub plan tree in the given plan
plan transform { case subPlan =>
supportedProviders
.collectFirst {
case provider if provider.isSupported(subPlan) =>
logInfo(s"Provider [${provider.name()}] can match sub plan ${subPlan.nodeName}")
val relation = provider.getRelation(subPlan)
val relationCols = collectRelationColumnsInQueryPlan(plan, relation)

// Choose the first covering index that meets all criteria above
findAllCoveringIndexesOnTable(relation.tableName)
.sortBy(_.name())
.find(index => isCoveringIndexApplicable(index, relationCols))
.map(index => replaceTableRelationWithIndexRelation(index, relation))
.getOrElse(subPlan) // If no index found, return the original node
}
.getOrElse(subPlan) // If not supported by any provider, return the original node
plan transform {
case subPlan @ Filter(condition, ExtractRelation(relation)) =>
doApply(plan, relation, Some(condition))
.map(newRelation => subPlan.copy(child = newRelation))
.getOrElse(subPlan)
case subPlan @ ExtractRelation(relation) =>
doApply(plan, relation, None)
.getOrElse(subPlan)
}
}
}

private def doApply(
plan: LogicalPlan,
relation: FlintSparkSourceRelation,
queryFilter: Option[Expression]): Option[LogicalPlan] = {
val relationCols = collectRelationColumnsInQueryPlan(plan, relation)

// Choose the first covering index that meets all criteria above
findAllCoveringIndexesOnTable(relation.tableName)
.sortBy(_.name())
.find(index => isCoveringIndexApplicable(index, queryFilter, relationCols))
.map(index => replaceTableRelationWithIndexRelation(index, relation))
}

private object ExtractRelation {
def unapply(subPlan: LogicalPlan): Option[FlintSparkSourceRelation] = {
supportedProviders.collectFirst {
case provider if provider.isSupported(subPlan) =>
logInfo(s"Provider [${provider.name()}] can match plan ${subPlan.nodeName}")
provider.getRelation(subPlan)
}
}
}
Expand Down Expand Up @@ -98,23 +116,34 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]

private def isCoveringIndexApplicable(
index: FlintSparkCoveringIndex,
queryFilter: Option[Expression],
relationCols: Set[String]): Boolean = {
val indexedCols = index.indexedColumns.keySet
val isSubsumed = subsume(queryFilter, index.filterCondition)
val isApplicable =
index.latestLogEntry.exists(_.state != DELETED) &&
index.filterCondition.isEmpty && // TODO: support partial covering index later
isSubsumed &&
relationCols.subsetOf(indexedCols)

logInfo(s"""
| Is covering index ${index.name()} applicable: $isApplicable
| Index state: ${index.latestLogEntry.map(_.state)}
| Index filter condition: ${index.filterCondition}
| Index filter subsumption: $isSubsumed
| Columns required: $relationCols
| Columns indexed: $indexedCols
|""".stripMargin)
isApplicable
}

private def subsume(queryFilter: Option[Expression], indexFilter: Option[String]): Boolean = {
(queryFilter, indexFilter) match {
case (_, None) => true // full indexing
case (None, Some(_)) => false
case (Some(_), Some(_)) =>
subsume(CatalystSqlParser.parseExpression(indexFilter.get), queryFilter.get)
}
}

private def replaceTableRelationWithIndexRelation(
index: FlintSparkCoveringIndex,
relation: FlintSparkSourceRelation): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,34 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
.assertIndexNotUsed(testTable)
}

test("should not apply if covering index is partial") {
assertFlintQueryRewriter
.withQuery(s"SELECT name FROM $testTable")
.withIndex(
new FlintSparkCoveringIndex(
indexName = "name",
tableName = testTable,
indexedColumns = Map("name" -> "string"),
filterCondition = Some("age > 30")))
.assertIndexNotUsed(testTable)
Seq(
/*("age = 30", "age = 20", false),
("age = 30", "age < 20", false),
("age = 30", "age > 50", false),
("age > 30 AND age < 60", "age > 20 AND age < 50", false),
("age = 30", "age = 30", true),
("age = 30", "age <= 30", true),
("age = 30", "age >= 30", true),
("age = 30", "age > 20 AND age < 50", true),*/
("age > 30 AND age < 40", "age > 20 AND age < 50", true)).foreach {
case (queryFilter, indexFilter, expectedResult) =>
test(
s"apply partial covering index with [$indexFilter] to query filter [$queryFilter]: $expectedResult") {
val assertion = assertFlintQueryRewriter
.withQuery(s"SELECT name FROM $testTable WHERE $queryFilter")
.withIndex(
new FlintSparkCoveringIndex(
indexName = "partial",
tableName = testTable,
indexedColumns = Map("name" -> "string", "age" -> "int"),
filterCondition = Some(indexFilter)))

if (expectedResult) {
assertion.assertIndexUsed(getFlintIndexName("partial", testTable))
} else {
assertion.assertIndexNotUsed(testTable)
}
}
}

test("should not apply if covering index is logically deleted") {
Expand All @@ -89,8 +107,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {

// Covering index doesn't cover column age
Seq(
s"SELECT * FROM $testTable",
s"SELECT name, age FROM $testTable",
// s"SELECT * FROM $testTable", // FIXME: only relation operator
// s"SELECT name, age FROM $testTable", // FIXME: only relation operator
s"SELECT name FROM $testTable WHERE age = 30",
s"SELECT COUNT(*) FROM $testTable GROUP BY age").foreach { query =>
test(s"should not apply if column is not covered in $query") {
Expand Down Expand Up @@ -167,7 +185,7 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
private var indexes: Seq[FlintSparkCoveringIndex] = Seq()

def withQuery(query: String): AssertionHelper = {
this.plan = sql(query).queryExecution.analyzed
this.plan = sql(query).queryExecution.optimizedPlan
this
}

Expand Down

0 comments on commit 06fc426

Please sign in to comment.