Skip to content

Commit

Permalink
Rule out logical deleted skipping index in query rewrite (#289)
Browse files Browse the repository at this point in the history
* Ignore logical deleted skipping index

Signed-off-by: Chen Dai <[email protected]>

* Add IT

Signed-off-by: Chen Dai <[email protected]>

* Rename skipping index check method

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Mar 22, 2024
1 parent eb03705 commit 94fc2f5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}

import org.apache.spark.sql.Column
Expand All @@ -17,7 +18,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.qualifyTableName

/**
Expand All @@ -40,7 +40,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
false))
if hasNoDisjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] =>
val index = flint.describeIndex(getIndexName(table))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
if (isActiveSkippingIndex(index)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)

Expand Down Expand Up @@ -69,7 +69,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
// Check if query plan already rewritten
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() =>
val index = flint.describeIndex(getIndexName(catalog, identifier))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
if (isActiveSkippingIndex(index)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)
/*
Expand Down Expand Up @@ -123,6 +123,12 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
}.isEmpty
}

private def isActiveSkippingIndex(index: Option[FlintSparkIndex]): Boolean = {
index.isDefined &&
index.get.kind == SKIPPING_INDEX_TYPE &&
index.get.latestLogEntry.exists(_.state != DELETED)
}

private def rewriteToIndexFilter(
index: FlintSparkSkippingIndex,
condition: Expression): Option[Expression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package org.opensearch.flint.spark.skipping
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind
Expand Down Expand Up @@ -58,7 +60,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name", "age")
.withSkippingIndex(testIndex, REFRESHING, "name", "age")
.shouldNotRewrite()
}

Expand All @@ -69,31 +71,39 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
And(
Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))),
EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name", "age")
.withSkippingIndex(testIndex, REFRESHING, "name", "age")
.shouldNotRewrite()
}

test("should rewrite query with skipping index") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(EqualTo(nameCol, Literal("hello")))
.withSkippingIndex(testIndex, "name")
.withSkippingIndex(testIndex, REFRESHING, "name")
.shouldPushDownAfterRewrite(col("name") === "hello")
}

test("should not rewrite query with deleted skipping index") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(EqualTo(nameCol, Literal("hello")))
.withSkippingIndex(testIndex, DELETED, "name")
.shouldNotRewrite()
}

test("should only push down filter condition with indexed column") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name")
.withSkippingIndex(testIndex, REFRESHING, "name")
.shouldPushDownAfterRewrite(col("name") === "hello")
}

test("should push down all filter conditions with indexed column") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name", "age")
.withSkippingIndex(testIndex, REFRESHING, "name", "age")
.shouldPushDownAfterRewrite(col("name") === "hello" && col("age") === 30)

assertFlintQueryRewriter()
Expand All @@ -102,7 +112,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
And(
EqualTo(nameCol, Literal("hello")),
And(EqualTo(ageCol, Literal(30)), EqualTo(addressCol, Literal("Seattle")))))
.withSkippingIndex(testIndex, "name", "age", "address")
.withSkippingIndex(testIndex, REFRESHING, "name", "age", "address")
.shouldPushDownAfterRewrite(
col("name") === "hello" && col("age") === 30 && col("address") === "Seattle")
}
Expand Down Expand Up @@ -139,12 +149,20 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
this
}

def withSkippingIndex(indexName: String, indexCols: String*): AssertionHelper = {
def withSkippingIndex(
indexName: String,
indexState: IndexState,
indexCols: String*): AssertionHelper = {
val skippingIndex = mock[FlintSparkSkippingIndex]
when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE)
when(skippingIndex.name()).thenReturn(indexName)
when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy))

// Mock index log entry with the given state
val logEntry = mock[FlintMetadataLogEntry]
when(logEntry.state).thenReturn(indexState)
when(skippingIndex.latestLogEntry).thenReturn(Some(logEntry))

when(flint.describeIndex(any())).thenReturn(Some(skippingIndex))
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,28 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("should not rewrite original query if skipping index is logically deleted") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
flint.deleteIndex(testIndex)

val query =
s"""
| SELECT name
| FROM $testTable
| WHERE year = 2023 AND month = 4
|""".stripMargin

val actual = sql(query).queryExecution.optimizedPlan
withFlintOptimizerDisabled {
val expect = sql(query).queryExecution.optimizedPlan
actual shouldBe expect
}
}

test("can build partition skipping index and rewrite applicable query") {
flint
.skippingIndex()
Expand Down

0 comments on commit 94fc2f5

Please sign in to comment.