Skip to content

Commit

Permalink
Move IT from Flint API suite to SQL suite
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 26, 2024
1 parent b65c0ab commit 967217f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import org.apache.spark.sql.flint.{qualifyTableName, FlintDataSourceV2}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Flint Spark covering index apply rule that rewrites applicable query's table scan operator to
* accelerate query by reducing data scanned significantly.
* Flint Spark covering index apply rule that replace applicable query's table scan operator to
* accelerate query by scanning covering index data.
*
* @param flint
* Flint Spark API
Expand All @@ -30,11 +30,10 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case relation @ LogicalRelation(_, _, Some(table), false)
if !plan.isInstanceOf[V2WriteCommand] => // Not an insert statement
val tableName = table.qualifiedName
val relationCols = collectRelationColumnsInQueryPlan(relation, plan)

// Choose the first covering index that meets all criteria above
findAllCoveringIndexesOnTable(tableName)
findAllCoveringIndexesOnTable(table.qualifiedName)
.collectFirst {
case index: FlintSparkCoveringIndex if isCoveringIndexApplicable(index, relationCols) =>
replaceTableRelationWithIndexRelation(index, relation)
Expand All @@ -45,17 +44,19 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
private def collectRelationColumnsInQueryPlan(
relation: LogicalRelation,
plan: LogicalPlan): Set[String] = {
// Collect all columns of the relation present in the query plan, except those in relation itself.
// Because this rule executes before push down optimization and thus relation includes all columns in the table.
val relationColById = relation.output.map(attr => (attr.exprId, attr)).toMap
/*
* Collect all columns of the relation present in query plan, except those in relation itself.
* Because this rule executes before push down optimization, relation includes all columns.
*/
val relationColsById = relation.output.map(attr => (attr.exprId, attr)).toMap
plan
.collect {
case _: LogicalRelation => Set.empty
case other =>
other.expressions
.flatMap(_.references)
.flatMap(ref =>
relationColById.get(ref.exprId)) // Ignore attribute not belong to relation
relationColsById.get(ref.exprId)) // Ignore attribute not belong to target relation
.map(attr => attr.name)
}
.flatten
Expand All @@ -79,15 +80,14 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
private def replaceTableRelationWithIndexRelation(
index: FlintSparkCoveringIndex,
relation: LogicalRelation): LogicalPlan = {
// Replace with data source relation so as to avoid OpenSearch index required in catalog
// Make use of data source relation to avoid Spark looking for OpenSearch index in catalog
val ds = new FlintDataSourceV2
val options = new CaseInsensitiveStringMap(util.Map.of("path", index.name()))
val inferredSchema = ds.inferSchema(options)
val flintTable = ds.getTable(inferredSchema, Array.empty, options)

// Keep original output attributes only if available in covering index.
// We have to reuse original attribute object because it's already analyzed
// with exprId referenced by the other parts of the query plan.
// Reuse original attribute object because it's already analyzed with exprId referenced
// by the other parts of the query plan.
val allRelationCols = relation.output.map(attr => (attr.name, attr)).toMap
val outputAttributes =
flintTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED

class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -164,48 +163,4 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.create()
deleteTestIndex(getFlintIndexName(newIndex, testTable))
}

test("rewrite applicable query with covering index") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan")
}

test("should not rewrite with covering index if disabled") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "false")
try {
checkKeywordsNotExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan")
} finally {
spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "true")
}
}

test("rewrite applicable query with covering index before skipping index") {
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("name")
.addMinMax("age")
.create()
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, OPTIMIZER_RULE_COVERING_INDEX_ENABLED}

class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {

Expand All @@ -43,35 +43,24 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
}

test("create covering index with auto refresh") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (auto_refresh = true)
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (auto_refresh = true)
| """.stripMargin)

val indexData = flint.queryIndex(testFlintIndex)
indexData.count() shouldBe 2
}

test("create covering index with filtering condition") {
sql(s"""
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WHERE address = 'Portland'
| WITH (auto_refresh = true)
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)

val indexData = flint.queryIndex(testFlintIndex)
indexData.count() shouldBe 1
}
Expand Down Expand Up @@ -256,6 +245,53 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("name", "age")
}

test("rewrite applicable query with covering index") {
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (auto_refresh = true)
| """.stripMargin)

val query = s"SELECT name, age FROM $testTable"
checkKeywordsExist(sql(s"EXPLAIN $query"), "FlintScan")
checkAnswer(sql(query), Seq(Row("Hello", 30), Row("World", 25)))
}

test("should not rewrite with covering index if disabled") {
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (auto_refresh = true)
|""".stripMargin)

spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "false")
try {
checkKeywordsNotExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan")
} finally {
spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "true")
}
}

test("rewrite applicable query with covering index before skipping index") {
try {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (age MIN_MAX)
| """.stripMargin)
awaitRefreshComplete(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (auto_refresh = true)
| """.stripMargin)

val query = s"SELECT name FROM $testTable WHERE age = 30"
checkKeywordsExist(sql(s"EXPLAIN $query"), "FlintScan")
checkAnswer(sql(query), Row("Hello"))
} finally {
deleteTestIndex(getSkippingIndexName(testTable))
}
}

test("show all covering index on the source table") {
flint
.coveringIndex()
Expand Down Expand Up @@ -308,14 +344,11 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe defined
flint.queryIndex(testFlintIndex).count() shouldBe 0

sql(s"""
awaitRefreshComplete(s"""
| ALTER INDEX $testIndex ON $testTable
| WITH (auto_refresh = true)
| """.stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.queryIndex(testFlintIndex).count() shouldBe 2
}

Expand All @@ -331,4 +364,12 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(testFlintIndex) shouldBe empty
}

private def awaitRefreshComplete(query: String): Unit = {
sql(query)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
}
}

0 comments on commit 967217f

Please sign in to comment.