Skip to content

Commit

Permalink
Merge branch 'main' into store-error-message-for-streaming-job-rebased
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Jul 18, 2024
2 parents 99f75b8 + f7aaa41 commit 17a2353
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.expressions._

/**
* Query rewrite helper that provides common utilities for query rewrite rule of various Flint
* indexes.
*/
trait FlintSparkQueryRewriteHelper {

/**
* Determines if the given filter expression consists solely of AND operations and no OR
* operations, implying that it's a conjunction of conditions.
*
* @param filter
* The filter expression to check.
* @return
* True if the filter contains only AND operations, False if any OR operations are found.
*/
def isConjunction(filter: Expression): Boolean = {
filter.collectFirst { case Or(_, _) =>
true
}.isEmpty
}

/**
* Determines if the conditions in an index filter can subsume those in a query filter. This is
* essential to verify if all outputs that satisfy the index filter also satisfy the query
* filter, indicating that the index can potentially optimize the query.
*
* @param indexFilter
* The filter expression defined from the index, required to be a conjunction.
* @param queryFilter
* The filter expression present in the user query, required to be a conjunction.
* @return
* True if the index filter can subsume the query filter, otherwise False.
*/
def subsume(indexFilter: Expression, queryFilter: Expression): Boolean = {
if (!isConjunction(indexFilter) || !isConjunction(queryFilter)) {
return false
}

// Flatten a potentially nested conjunction into a sequence of individual conditions
def flattenConditions(filter: Expression): Seq[Expression] = filter match {
case And(left, right) => flattenConditions(left) ++ flattenConditions(right)
case other => Seq(other)
}
val indexConditions = flattenConditions(indexFilter)
val queryConditions = flattenConditions(queryFilter)

// Ensures that every condition in the index filter is subsumed by at least one condition
// on the same column in the query filter
indexConditions.forall { indexCond =>
queryConditions.exists { queryCond =>
(indexCond, queryCond) match {
case (
indexComp @ BinaryComparison(indexCol: Attribute, _),
queryComp @ BinaryComparison(queryCol: Attribute, _))
if indexCol.name == queryCol.name =>
Range(indexComp).subsume(Range(queryComp))
case _ => false // consider as not subsumed for unsupported expression
}
}
}
}

/**
* Represents a range with optional lower and upper bounds.
*
* @param lower
* The optional lower bound
* @param upper
* The optional upper bound
*/
private case class Range(lower: Option[Bound], upper: Option[Bound]) {

/**
* Determines if this range subsumes (completely covers) another range.
*
* @param other
* The other range to compare against.
* @return
* True if this range subsumes the other, otherwise false.
*/
def subsume(other: Range): Boolean = {
// Unknown range cannot subsume or be subsumed by any
if (this == Range.UNKNOWN || other == Range.UNKNOWN) {
return false
}

// Subsumption check helper for lower and upper bound
def subsumeHelper(
thisBound: Option[Bound],
otherBound: Option[Bound],
comp: (Bound, Bound) => Boolean): Boolean =
(thisBound, otherBound) match {
case (Some(a), Some(b)) => comp(a, b)
case (None, _) => true // this is unbounded and thus can subsume any other bound
case (_, None) => false // other is unbounded and thus cannot be subsumed by any
}
subsumeHelper(lower, other.lower, _.lessThanOrEqualTo(_)) &&
subsumeHelper(upper, other.upper, _.greaterThanOrEqualTo(_))
}
}

private object Range {

/** Unknown range for unsupported binary comparison expression */
private val UNKNOWN: Range = Range(None, None)

/**
* Constructs a Range object from a binary comparison expression, translating comparison
* operators into bounds with appropriate inclusiveness.
*
* @param condition
* The binary comparison
*/
def apply(condition: BinaryComparison): Range = condition match {
case GreaterThan(_, Literal(value: Comparable[Any], _)) =>
Range(Some(Bound(value, inclusive = false)), None)
case GreaterThanOrEqual(_, Literal(value: Comparable[Any], _)) =>
Range(Some(Bound(value, inclusive = true)), None)
case LessThan(_, Literal(value: Comparable[Any], _)) =>
Range(None, Some(Bound(value, inclusive = false)))
case LessThanOrEqual(_, Literal(value: Comparable[Any], _)) =>
Range(None, Some(Bound(value, inclusive = true)))
case EqualTo(_, Literal(value: Comparable[Any], _)) =>
Range(Some(Bound(value, inclusive = true)), Some(Bound(value, inclusive = true)))
case _ => UNKNOWN
}
}

/**
* Represents a bound (lower or upper) in a range, defined by a literal value and its
* inclusiveness.
*
* @param value
* The literal value defining the bound.
* @param inclusive
* Indicates whether the bound is inclusive.
*/
private case class Bound(value: Comparable[Any], inclusive: Boolean) {

/**
* Checks if this bound is less than or equal to another bound, considering inclusiveness.
*
* @param other
* The bound to compare against.
* @return
* True if this bound is less than or equal to the other bound, either because value is
* smaller, this bound is inclusive, or both bound are exclusive.
*/
def lessThanOrEqualTo(other: Bound): Boolean = {
val cmp = value.compareTo(other.value)
cmp < 0 || (cmp == 0 && (inclusive || !other.inclusive))
}

/**
* Checks if this bound is greater than or equal to another bound, considering inclusiveness.
*
* @param other
* The bound to compare against.
* @return
* True if this bound is greater than or equal to the other bound, either because value is
* greater, this bound is inclusive, or both bound are exclusive.
*/
def greaterThanOrEqualTo(other: Bound): Boolean = {
val cmp = value.compareTo(other.value)
cmp > 0 || (cmp == 0 && (inclusive || !other.inclusive))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ package org.opensearch.flint.spark.covering
import java.util

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.{FlintSpark, FlintSparkQueryRewriteHelper}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSourceRelationProvider}

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parseExpression
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.{qualifyTableName, FlintDataSourceV2}
Expand All @@ -27,7 +28,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* @param flint
* Flint Spark API
*/
class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] {
class ApplyFlintSparkCoveringIndex(flint: FlintSpark)
extends Rule[LogicalPlan]
with FlintSparkQueryRewriteHelper {

/** All supported source relation providers */
private val supportedProviders = FlintSparkSourceRelationProvider.getAllProviders(flint.spark)
Expand All @@ -37,22 +40,38 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
plan
} else {
// Iterate each sub plan tree in the given plan
plan transform { case subPlan =>
supportedProviders
.collectFirst {
case provider if provider.isSupported(subPlan) =>
logInfo(s"Provider [${provider.name()}] can match sub plan ${subPlan.nodeName}")
val relation = provider.getRelation(subPlan)
val relationCols = collectRelationColumnsInQueryPlan(plan, relation)

// Choose the first covering index that meets all criteria above
findAllCoveringIndexesOnTable(relation.tableName)
.sortBy(_.name())
.find(index => isCoveringIndexApplicable(index, relationCols))
.map(index => replaceTableRelationWithIndexRelation(index, relation))
.getOrElse(subPlan) // If no index found, return the original node
}
.getOrElse(subPlan) // If not supported by any provider, return the original node
plan transform {
case filter @ Filter(condition, Relation(sourceRelation)) =>
doApply(plan, sourceRelation, Some(condition))
.map(newRelation => filter.copy(child = newRelation))
.getOrElse(filter)
case relation @ Relation(sourceRelation) =>
doApply(plan, sourceRelation, None)
.getOrElse(relation)
}
}
}

private def doApply(
plan: LogicalPlan,
relation: FlintSparkSourceRelation,
queryFilter: Option[Expression]): Option[LogicalPlan] = {
val relationCols = collectRelationColumnsInQueryPlan(plan, relation)

// Choose the first covering index that meets all criteria above
findAllCoveringIndexesOnTable(relation.tableName)
.sortBy(_.name())
.find(index => isCoveringIndexApplicable(index, queryFilter, relationCols))
.map(index => replaceTableRelationWithIndexRelation(index, relation))
}

private object Relation {
def unapply(subPlan: LogicalPlan): Option[FlintSparkSourceRelation] = {
// Check if any source relation can support the plan node
supportedProviders.collectFirst {
case provider if provider.isSupported(subPlan) =>
logInfo(s"Provider [${provider.name()}] can match plan ${subPlan.nodeName}")
provider.getRelation(subPlan)
}
}
}
Expand All @@ -65,20 +84,28 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
* Because this rule executes before push down optimization, relation includes all columns.
*/
val relationColsById = relation.output.map(attr => (attr.exprId, attr)).toMap
plan
.collect {
// Relation interface matches both file and Iceberg relation
case r: MultiInstanceRelation if r.eq(relation.plan) => Set.empty
case other =>
other.expressions
.flatMap(_.references)
.flatMap(ref => {
relationColsById.get(ref.exprId)
}) // Ignore attribute not belong to current relation being rewritten
.map(attr => attr.name)
}
.flatten
.toSet
val relationCols =
plan
.collect {
// Relation interface matches both file and Iceberg relation
case r: MultiInstanceRelation if r.eq(relation.plan) => Set.empty
case other =>
other.expressions
.flatMap(_.references)
.flatMap(ref => {
relationColsById.get(ref.exprId)
}) // Ignore attribute not belong to current relation being rewritten
.map(attr => attr.name)
}
.flatten
.toSet

if (relationCols.isEmpty) {
// Return all if plan only has relation operator, e.g. SELECT * or all columns
relationColsById.values.map(_.name).toSet
} else {
relationCols
}
}

private def findAllCoveringIndexesOnTable(tableName: String): Seq[FlintSparkCoveringIndex] = {
Expand All @@ -87,8 +114,8 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
val indexes =
flint
.describeIndexes(indexPattern)
.collect { // cast to covering index
case index: FlintSparkCoveringIndex => index
.collect { // cast to covering index and double check table name
case index: FlintSparkCoveringIndex if index.tableName == qualifiedTableName => index
}

val indexNames = indexes.map(_.name()).mkString(",")
Expand All @@ -98,17 +125,26 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]

private def isCoveringIndexApplicable(
index: FlintSparkCoveringIndex,
queryFilter: Option[Expression],
relationCols: Set[String]): Boolean = {
val indexedCols = index.indexedColumns.keySet
val subsumption = (index.filterCondition, queryFilter) match {
case (None, _) => true // full index can cover any query filter
case (Some(_), None) => false // partial index cannot cover query without filter
case (Some(indexFilter), Some(_)) =>
subsume(parseExpression(indexFilter), queryFilter.get)
}
val isApplicable =
index.latestLogEntry.exists(_.state != DELETED) &&
index.filterCondition.isEmpty && // TODO: support partial covering index later
subsumption &&
relationCols.subsetOf(indexedCols)

logInfo(s"""
| Is covering index ${index.name()} applicable: $isApplicable
| Index state: ${index.latestLogEntry.map(_.state)}
| Index filter condition: ${index.filterCondition}
| Query filter: $queryFilter
| Index filter: ${index.filterCondition}
| Subsumption test: $subsumption
| Columns required: $relationCols
| Columns indexed: $indexedCols
|""".stripMargin)
Expand Down
Loading

0 comments on commit 17a2353

Please sign in to comment.