Skip to content

Commit

Permalink
Exclude logically deleted covering index
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 23, 2024
1 parent c8a908e commit 0b13fb0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.flint.spark.covering

import java.util

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName

Expand Down Expand Up @@ -40,20 +41,18 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
case relation @ LogicalRelation(_, _, Some(table), false)
if !plan.isInstanceOf[V2WriteCommand] =>
val tableName = table.qualifiedName
val requiredCols = requiredColumnsInQueryPlan(plan)
val requiredCols = collectAllColumnsInQueryPlan(plan)

// Choose the first covering index that meets all criteria above
coveringIndexesOnTable(tableName)
findAllCoveringIndexesOnTable(tableName)
.collectFirst {
case index: FlintSparkCoveringIndex
if index.filterCondition.isEmpty &&
requiredCols.subsetOf(index.indexedColumns.keySet) =>
case index: FlintSparkCoveringIndex if isCoveringIndexApplicable(index, requiredCols) =>
replaceTableRelationWithIndexRelation(relation, index)
}
.getOrElse(relation) // If no index found, return the original relation
}

private def requiredColumnsInQueryPlan(plan: LogicalPlan): Set[String] = {
private def collectAllColumnsInQueryPlan(plan: LogicalPlan): Set[String] = {
// Collect all columns needed by the query, except those in relation. This is because this rule
// executes before push down optimization and thus relation includes all columns in the table.
plan
Expand All @@ -65,12 +64,20 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
.toSet
}

private def coveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = {
private def findAllCoveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = {
val qualifiedTableName = qualifyTableName(flint.spark, tableName)
val indexPattern = getFlintIndexName("*", qualifiedTableName)
flint.describeIndexes(indexPattern)
}

private def isCoveringIndexApplicable(
index: FlintSparkCoveringIndex,
requiredCols: Set[String]): Boolean = {
index.latestLogEntry.exists(_.state != DELETED) &&
index.filterCondition.isEmpty &&
requiredCols.subsetOf(index.indexedColumns.keySet)
}

private def replaceTableRelationWithIndexRelation(
relation: LogicalRelation,
index: FlintSparkCoveringIndex): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package org.opensearch.flint.spark.covering
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS}
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.scalatest.matchers.{Matcher, MatchResult}
Expand Down Expand Up @@ -51,14 +53,27 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
super.afterAll()
}

test("should not apply if no index present") {
test("should not apply if no covering index present") {
assertFlintQueryRewriter
.withQuery(s"SELECT name, age FROM $testTable")
.assertIndexNotUsed()
}

test("should not apply if covering index is logically deleted") {
assertFlintQueryRewriter
.withQuery(s"SELECT name FROM $testTable")
.withIndex(
new FlintSparkCoveringIndex(
indexName = "name",
tableName = testTable,
indexedColumns = Map("name" -> "string")),
DELETED)
.assertIndexNotUsed()
}

// Covering index doesn't column age
Seq(
s"SELECT * FROM $testTable",
s"SELECT name, age FROM $testTable",
s"SELECT name FROM $testTable WHERE age = 30",
s"SELECT COUNT(*) FROM $testTable GROUP BY age").foreach { query =>
Expand All @@ -78,6 +93,7 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
Seq(
s"SELECT * FROM $testTable",
s"SELECT name, age FROM $testTable",
s"SELECT age, name FROM $testTable",
s"SELECT name FROM $testTable WHERE age = 30",
s"SELECT COUNT(*) FROM $testTable GROUP BY age",
s"SELECT name, COUNT(*) FROM $testTable WHERE age > 30 GROUP BY name").foreach { query =>
Expand Down Expand Up @@ -120,8 +136,10 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
this
}

def withIndex(index: FlintSparkCoveringIndex): AssertionHelper = {
this.indexes = indexes :+ index
def withIndex(index: FlintSparkCoveringIndex, state: IndexState = ACTIVE): AssertionHelper = {
this.indexes = indexes :+
index.copy(latestLogEntry =
Some(new FlintMetadataLogEntry("id", 0, 0, 0, state, "spark_catalog", "")))
this
}

Expand Down

0 comments on commit 0b13fb0

Please sign in to comment.