diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index 8e88dd630..c8cb62b2c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.spark.covering import java.util +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName @@ -40,20 +41,18 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] case relation @ LogicalRelation(_, _, Some(table), false) if !plan.isInstanceOf[V2WriteCommand] => val tableName = table.qualifiedName - val requiredCols = requiredColumnsInQueryPlan(plan) + val requiredCols = collectAllColumnsInQueryPlan(plan) // Choose the first covering index that meets all criteria above - coveringIndexesOnTable(tableName) + findAllCoveringIndexesOnTable(tableName) .collectFirst { - case index: FlintSparkCoveringIndex - if index.filterCondition.isEmpty && - requiredCols.subsetOf(index.indexedColumns.keySet) => + case index: FlintSparkCoveringIndex if isCoveringIndexApplicable(index, requiredCols) => replaceTableRelationWithIndexRelation(relation, index) } .getOrElse(relation) // If no index found, return the original relation } - private def requiredColumnsInQueryPlan(plan: LogicalPlan): Set[String] = { + private def collectAllColumnsInQueryPlan(plan: LogicalPlan): Set[String] = { // Collect all columns needed by the query, except those in relation. This is because this rule // executes before push down optimization and thus relation includes all columns in the table. plan @@ -65,12 +64,20 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] .toSet } - private def coveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = { + private def findAllCoveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = { val qualifiedTableName = qualifyTableName(flint.spark, tableName) val indexPattern = getFlintIndexName("*", qualifiedTableName) flint.describeIndexes(indexPattern) } + private def isCoveringIndexApplicable( + index: FlintSparkCoveringIndex, + requiredCols: Set[String]): Boolean = { + index.latestLogEntry.exists(_.state != DELETED) && + index.filterCondition.isEmpty && + requiredCols.subsetOf(index.indexedColumns.keySet) + } + private def replaceTableRelationWithIndexRelation( relation: LogicalRelation, index: FlintSparkCoveringIndex): LogicalPlan = { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 5cda20536..6a64be925 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -8,6 +8,8 @@ package org.opensearch.flint.spark.covering import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.{Matcher, MatchResult} @@ -51,14 +53,27 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { super.afterAll() } - test("should not apply if no index present") { + test("should not apply if no covering index present") { assertFlintQueryRewriter .withQuery(s"SELECT name, age FROM $testTable") .assertIndexNotUsed() } + test("should not apply if covering index is logically deleted") { + assertFlintQueryRewriter + .withQuery(s"SELECT name FROM $testTable") + .withIndex( + new FlintSparkCoveringIndex( + indexName = "name", + tableName = testTable, + indexedColumns = Map("name" -> "string")), + DELETED) + .assertIndexNotUsed() + } + // Covering index doesn't column age Seq( + s"SELECT * FROM $testTable", s"SELECT name, age FROM $testTable", s"SELECT name FROM $testTable WHERE age = 30", s"SELECT COUNT(*) FROM $testTable GROUP BY age").foreach { query => @@ -78,6 +93,7 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { Seq( s"SELECT * FROM $testTable", s"SELECT name, age FROM $testTable", + s"SELECT age, name FROM $testTable", s"SELECT name FROM $testTable WHERE age = 30", s"SELECT COUNT(*) FROM $testTable GROUP BY age", s"SELECT name, COUNT(*) FROM $testTable WHERE age > 30 GROUP BY name").foreach { query => @@ -120,8 +136,10 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { this } - def withIndex(index: FlintSparkCoveringIndex): AssertionHelper = { - this.indexes = indexes :+ index + def withIndex(index: FlintSparkCoveringIndex, state: IndexState = ACTIVE): AssertionHelper = { + this.indexes = indexes :+ + index.copy(latestLogEntry = + Some(new FlintMetadataLogEntry("id", 0, 0, 0, state, "spark_catalog", ""))) this }