From c8a908e77d0c6304a9847634317ff59ff3422e7f Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 23 Apr 2024 11:56:15 -0700 Subject: [PATCH] Add more IT Signed-off-by: Chen Dai --- .../sql/flint/config/FlintSparkConf.scala | 2 +- .../flint/spark/FlintSparkOptimizer.scala | 23 ++++++++---- .../ApplyFlintSparkCoveringIndex.scala | 33 +++++++++-------- .../FlintSparkCoveringIndexITSuite.scala | 36 +++++++++++++++++-- 4 files changed, 68 insertions(+), 26 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 4ff789e88..9a8623e35 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -205,7 +205,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable def isOptimizerEnabled: Boolean = OPTIMIZER_RULE_ENABLED.readFrom(reader).toBoolean - def isCoveringIndexRewriteEnabled: Boolean = + def isCoveringIndexOptimizerEnabled: Boolean = OPTIMIZER_RULE_COVERING_INDEX_ENABLED.readFrom(reader).toBoolean def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkOptimizer.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkOptimizer.scala index 4382c0d0f..8f6d32986 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkOptimizer.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkOptimizer.scala @@ -23,19 +23,30 @@ class FlintSparkOptimizer(spark: SparkSession) extends Rule[LogicalPlan] { /** Flint Spark API */ private val flint: FlintSpark = new FlintSpark(spark) - /** Only one Flint optimizer rule for now. Need to estimate cost if more than one in future. */ - private val rules = - Seq(new ApplyFlintSparkCoveringIndex(flint), new ApplyFlintSparkSkippingIndex(flint)) + /** Skipping index rewrite rule */ + private val skippingIndexRule = new ApplyFlintSparkSkippingIndex(flint) + + /** Covering index rewrite rule */ + private val coveringIndexRule = new ApplyFlintSparkCoveringIndex(flint) override def apply(plan: LogicalPlan): LogicalPlan = { - if (FlintSparkConf().isCoveringIndexRewriteEnabled) { - rules.head.apply(plan) // TODO: apply one by one + if (isFlintOptimizerEnabled) { + if (isCoveringIndexOptimizerEnabled) { + // Apply covering index rule first + skippingIndexRule.apply(coveringIndexRule.apply(plan)) + } else { + skippingIndexRule.apply(plan) + } } else { plan } } - private def isOptimizerEnabled: Boolean = { + private def isFlintOptimizerEnabled: Boolean = { FlintSparkConf().isOptimizerEnabled } + + private def isCoveringIndexOptimizerEnabled: Boolean = { + FlintSparkConf().isCoveringIndexOptimizerEnabled + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index 9aab3c547..8e88dd630 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -26,25 +26,24 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap */ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] { + /** + * Prerequisite: + * ``` + * 1) Not an insert statement + * 2) Relation is supported, ex. Iceberg, Delta, File. (is this check required?) + * 3) Any covering index on the table: + * 3.1) doesn't have filtering condition + * 3.2) cover all columns present in the query + * ``` + */ override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - - /** - * Prerequisite: - * ``` - * 1) Not an insert statement - * 2) Relation is supported, ex. Iceberg, Delta, File. (is this check required?) - * 3) Any covering index on the table: - * 3.1) doesn't have filtering condition - * 3.2) cover all columns present in the query - * ``` - */ case relation @ LogicalRelation(_, _, Some(table), false) if !plan.isInstanceOf[V2WriteCommand] => val tableName = table.qualifiedName - val requiredCols = allRequiredColumnsInQueryPlan(plan) + val requiredCols = requiredColumnsInQueryPlan(plan) // Choose the first covering index that meets all criteria above - allCoveringIndexesOnTable(tableName) + coveringIndexesOnTable(tableName) .collectFirst { case index: FlintSparkCoveringIndex if index.filterCondition.isEmpty && @@ -54,7 +53,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] .getOrElse(relation) // If no index found, return the original relation } - private def allRequiredColumnsInQueryPlan(plan: LogicalPlan): Set[String] = { + private def requiredColumnsInQueryPlan(plan: LogicalPlan): Set[String] = { // Collect all columns needed by the query, except those in relation. This is because this rule // executes before push down optimization and thus relation includes all columns in the table. plan @@ -66,7 +65,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] .toSet } - private def allCoveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = { + private def coveringIndexesOnTable(tableName: String): Seq[FlintSparkIndex] = { val qualifiedTableName = qualifyTableName(flint.spark, tableName) val indexPattern = getFlintIndexName("*", qualifiedTableName) flint.describeIndexes(indexPattern) @@ -75,13 +74,13 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] private def replaceTableRelationWithIndexRelation( relation: LogicalRelation, index: FlintSparkCoveringIndex): LogicalPlan = { + // Replace with data source relation so as to avoid OpenSearch index required in catalog val ds = new FlintDataSourceV2 val options = new CaseInsensitiveStringMap(util.Map.of("path", index.name())) val inferredSchema = ds.inferSchema(options) val flintTable = ds.getTable(inferredSchema, Array.empty, options) - // Adjust attributes to match the original plan's output - // TODO: replace original source column type with filed type in index metadata? + // Keep original output attributes in index only val outputAttributes = index.indexedColumns.keys .map(colName => relation.output.find(_.name == colName).get) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index d9abc0b57..bb8d408a0 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -14,6 +14,7 @@ import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.sql.Row +import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { @@ -172,8 +173,39 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("name", "age") .create() - flint.refreshIndex(testFlintIndex) + checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan") + } + + test("should not rewrite with covering index if disabled") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "false") + try { + checkKeywordsNotExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan") + } finally { + spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "true") + } + } + + test("rewrite applicable query with covering index before skipping index") { + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("name") + .addMinMax("age") + .create() + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() - sql(s"SELECT name, age FROM $testTable").show + checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan") } }