From 967217f24fcec2316684a1a7c3a72cd7f5d61cd0 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 26 Apr 2024 09:10:57 -0700 Subject: [PATCH] Move IT from Flint API suite to SQL suite Signed-off-by: Chen Dai --- .../ApplyFlintSparkCoveringIndex.scala | 24 +++--- .../FlintSparkCoveringIndexITSuite.scala | 45 ---------- .../FlintSparkCoveringIndexSqlITSuite.scala | 85 ++++++++++++++----- 3 files changed, 75 insertions(+), 79 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala index b3ced7b53..a840755be 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndex.scala @@ -19,8 +19,8 @@ import org.apache.spark.sql.flint.{qualifyTableName, FlintDataSourceV2} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** - * Flint Spark covering index apply rule that rewrites applicable query's table scan operator to - * accelerate query by reducing data scanned significantly. + * Flint Spark covering index apply rule that replace applicable query's table scan operator to + * accelerate query by scanning covering index data. * * @param flint * Flint Spark API @@ -30,11 +30,10 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case relation @ LogicalRelation(_, _, Some(table), false) if !plan.isInstanceOf[V2WriteCommand] => // Not an insert statement - val tableName = table.qualifiedName val relationCols = collectRelationColumnsInQueryPlan(relation, plan) // Choose the first covering index that meets all criteria above - findAllCoveringIndexesOnTable(tableName) + findAllCoveringIndexesOnTable(table.qualifiedName) .collectFirst { case index: FlintSparkCoveringIndex if isCoveringIndexApplicable(index, relationCols) => replaceTableRelationWithIndexRelation(index, relation) @@ -45,9 +44,11 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] private def collectRelationColumnsInQueryPlan( relation: LogicalRelation, plan: LogicalPlan): Set[String] = { - // Collect all columns of the relation present in the query plan, except those in relation itself. - // Because this rule executes before push down optimization and thus relation includes all columns in the table. - val relationColById = relation.output.map(attr => (attr.exprId, attr)).toMap + /* + * Collect all columns of the relation present in query plan, except those in relation itself. + * Because this rule executes before push down optimization, relation includes all columns. + */ + val relationColsById = relation.output.map(attr => (attr.exprId, attr)).toMap plan .collect { case _: LogicalRelation => Set.empty @@ -55,7 +56,7 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] other.expressions .flatMap(_.references) .flatMap(ref => - relationColById.get(ref.exprId)) // Ignore attribute not belong to relation + relationColsById.get(ref.exprId)) // Ignore attribute not belong to target relation .map(attr => attr.name) } .flatten @@ -79,15 +80,14 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan] private def replaceTableRelationWithIndexRelation( index: FlintSparkCoveringIndex, relation: LogicalRelation): LogicalPlan = { - // Replace with data source relation so as to avoid OpenSearch index required in catalog + // Make use of data source relation to avoid Spark looking for OpenSearch index in catalog val ds = new FlintDataSourceV2 val options = new CaseInsensitiveStringMap(util.Map.of("path", index.name())) val inferredSchema = ds.inferSchema(options) val flintTable = ds.getTable(inferredSchema, Array.empty, options) - // Keep original output attributes only if available in covering index. - // We have to reuse original attribute object because it's already analyzed - // with exprId referenced by the other parts of the query plan. + // Reuse original attribute object because it's already analyzed with exprId referenced + // by the other parts of the query plan. val allRelationCols = relation.output.map(attr => (attr.name, attr)).toMap val outputAttributes = flintTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 2d73807b5..e5aa7b4d1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -14,7 +14,6 @@ import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.sql.Row -import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { @@ -164,48 +163,4 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .create() deleteTestIndex(getFlintIndexName(newIndex, testTable)) } - - test("rewrite applicable query with covering index") { - flint - .coveringIndex() - .name(testIndex) - .onTable(testTable) - .addIndexColumns("name", "age") - .create() - - checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan") - } - - test("should not rewrite with covering index if disabled") { - flint - .coveringIndex() - .name(testIndex) - .onTable(testTable) - .addIndexColumns("name", "age") - .create() - - spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "false") - try { - checkKeywordsNotExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan") - } finally { - spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "true") - } - } - - test("rewrite applicable query with covering index before skipping index") { - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("name") - .addMinMax("age") - .create() - flint - .coveringIndex() - .name(testIndex) - .onTable(testTable) - .addIndexColumns("name", "age") - .create() - - checkKeywordsExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan") - } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index dd15624cf..432de1b12 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -19,7 +19,7 @@ import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row -import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY +import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, OPTIMIZER_RULE_COVERING_INDEX_ENABLED} class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { @@ -43,35 +43,24 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { } test("create covering index with auto refresh") { - sql(s""" - | CREATE INDEX $testIndex ON $testTable - | (name, age) - | WITH (auto_refresh = true) - |""".stripMargin) - - // Wait for streaming job complete current micro batch - val job = spark.streams.active.find(_.name == testFlintIndex) - job shouldBe defined - failAfter(streamingTimeout) { - job.get.processAllAvailable() - } + awaitRefreshComplete(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (auto_refresh = true) + | """.stripMargin) val indexData = flint.queryIndex(testFlintIndex) indexData.count() shouldBe 2 } test("create covering index with filtering condition") { - sql(s""" + awaitRefreshComplete(s""" | CREATE INDEX $testIndex ON $testTable | (name, age) | WHERE address = 'Portland' | WITH (auto_refresh = true) |""".stripMargin) - // Wait for streaming job complete current micro batch - val job = spark.streams.active.find(_.name == testFlintIndex) - awaitStreamingComplete(job.get.id.toString) - val indexData = flint.queryIndex(testFlintIndex) indexData.count() shouldBe 1 } @@ -256,6 +245,53 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("name", "age") } + test("rewrite applicable query with covering index") { + awaitRefreshComplete(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (auto_refresh = true) + | """.stripMargin) + + val query = s"SELECT name, age FROM $testTable" + checkKeywordsExist(sql(s"EXPLAIN $query"), "FlintScan") + checkAnswer(sql(query), Seq(Row("Hello", 30), Row("World", 25))) + } + + test("should not rewrite with covering index if disabled") { + awaitRefreshComplete(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (auto_refresh = true) + |""".stripMargin) + + spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "false") + try { + checkKeywordsNotExist(sql(s"EXPLAIN SELECT name, age FROM $testTable"), "FlintScan") + } finally { + spark.conf.set(OPTIMIZER_RULE_COVERING_INDEX_ENABLED.key, "true") + } + } + + test("rewrite applicable query with covering index before skipping index") { + try { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | (age MIN_MAX) + | """.stripMargin) + awaitRefreshComplete(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (auto_refresh = true) + | """.stripMargin) + + val query = s"SELECT name FROM $testTable WHERE age = 30" + checkKeywordsExist(sql(s"EXPLAIN $query"), "FlintScan") + checkAnswer(sql(query), Row("Hello")) + } finally { + deleteTestIndex(getSkippingIndexName(testTable)) + } + } + test("show all covering index on the source table") { flint .coveringIndex() @@ -308,14 +344,11 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe defined flint.queryIndex(testFlintIndex).count() shouldBe 0 - sql(s""" + awaitRefreshComplete(s""" | ALTER INDEX $testIndex ON $testTable | WITH (auto_refresh = true) | """.stripMargin) - // Wait for streaming job complete current micro batch - val job = spark.streams.active.find(_.name == testFlintIndex) - awaitStreamingComplete(job.get.id.toString) flint.queryIndex(testFlintIndex).count() shouldBe 2 } @@ -331,4 +364,12 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { sql(s"VACUUM INDEX $testIndex ON $testTable") flint.describeIndex(testFlintIndex) shouldBe empty } + + private def awaitRefreshComplete(query: String): Unit = { + sql(query) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + awaitStreamingComplete(job.get.id.toString) + } }