Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 23, 2024
1 parent 6772e30 commit c8a908e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def isOptimizerEnabled: Boolean = OPTIMIZER_RULE_ENABLED.readFrom(reader).toBoolean

def isCoveringIndexRewriteEnabled: Boolean =
def isCoveringIndexOptimizerEnabled: Boolean =
OPTIMIZER_RULE_COVERING_INDEX_ENABLED.readFrom(reader).toBoolean

def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,30 @@ class FlintSparkOptimizer(spark: SparkSession) extends Rule[LogicalPlan] {
/** Flint Spark API */
private val flint: FlintSpark = new FlintSpark(spark)

/** Only one Flint optimizer rule for now. Need to estimate cost if more than one in future. */
private val rules =
Seq(new ApplyFlintSparkCoveringIndex(flint), new ApplyFlintSparkSkippingIndex(flint))
/** Skipping index rewrite rule */
private val skippingIndexRule = new ApplyFlintSparkSkippingIndex(flint)

/** Covering index rewrite rule */
private val coveringIndexRule = new ApplyFlintSparkCoveringIndex(flint)

override def apply(plan: LogicalPlan): LogicalPlan = {
if (FlintSparkConf().isCoveringIndexRewriteEnabled) {
rules.head.apply(plan) // TODO: apply one by one
if (isFlintOptimizerEnabled) {
if (isCoveringIndexOptimizerEnabled) {
// Apply covering index rule first
skippingIndexRule.apply(coveringIndexRule.apply(plan))
} else {
skippingIndexRule.apply(plan)
}
} else {
plan
}
}

private def isOptimizerEnabled: Boolean = {
private def isFlintOptimizerEnabled: Boolean = {
FlintSparkConf().isOptimizerEnabled
}

private def isCoveringIndexOptimizerEnabled: Boolean = {
FlintSparkConf().isCoveringIndexOptimizerEnabled
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,24 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
*/
class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] {

/**
* Prerequisite:
* ```
* 1) Not an insert statement
* 2) Relation is supported, ex. Iceberg, Delta, File. (is this check required?)
* 3) Any covering index on the table:
* 3.1) doesn't have filtering condition
* 3.2) cover all columns present in the query
* ```
*/
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

/**
* Prerequisite:
* ```
* 1) Not an insert statement
* 2) Relation is supported, ex. Iceberg, Delta, File. (is this check required?)
* 3) Any covering index on the table:
* 3.1) doesn't have filtering condition
* 3.2) cover all columns present in the query
* ```
*/
case relation @ LogicalRelation(_, _, Some(table), false)
if !plan.isInstanceOf[V2WriteCommand] =>
val tableName = table.qualifiedName
val requiredCols = allRequiredColumnsInQueryPlan(plan)
val requiredCols = requiredColumnsInQueryPlan(plan)

// Choose the first covering index that meets all criteria above
allCoveringIndexesOnTable(tableName)
coveringIndexesOnTable(tableName)
.collectFirst {
case index: FlintSparkCoveringIndex
if index.filterCondition.isEmpty &&
Expand All @@ -54,7 +53,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
.getOrElse(relation) // If no index found, return the original relation
}

private def allRequiredColumnsInQueryPlan(plan: LogicalPlan): Set[String] = {
private def requiredColumnsInQueryPlan(plan: LogicalPlan): Set[String] = {
// Collect all columns needed by the query, except those in relation. This is because this rule
// executes before push down optimization and thus relation includes all columns in the table.
plan
Expand All @@ -66,7 +65,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
.toSet
}

private def allCoveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = {
private def coveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = {
val qualifiedTableName = qualifyTableName(flint.spark, tableName)
val indexPattern = getFlintIndexName("*", qualifiedTableName)
flint.describeIndexes(indexPattern)
Expand All @@ -75,13 +74,13 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
private def replaceTableRelationWithIndexRelation(
relation: LogicalRelation,
index: FlintSparkCoveringIndex): LogicalPlan = {
// Replace with data source relation so as to avoid OpenSearch index required in catalog
val ds = new FlintDataSourceV2
val options = new CaseInsensitiveStringMap(util.Map.of("path", index.name()))
val inferredSchema = ds.inferSchema(options)
val flintTable = ds.getTable(inferredSchema, Array.empty, options)

// Adjust attributes to match the original plan's output
// TODO: replace original source column type with filed type in index metadata?
// Keep original output attributes in index only
val outputAttributes =
index.indexedColumns.keys
.map(colName => relation.output.find(_.name == colName).get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED

class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -172,8 +173,39 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.addIndexColumns("name", "age")
.create()

flint.refreshIndex(testFlintIndex)
checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan")
}

test("should not rewrite with covering index if disabled") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "false")
try {
checkKeywordsNotExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan")
} finally {
spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "true")
}
}

test("rewrite applicable query with covering index before skipping index") {
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("name")
.addMinMax("age")
.create()
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

sql(s"SELECT name, age FROM $testTable").show
checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan")
}
}

0 comments on commit c8a908e

Please sign in to comment.