From e61f3dd854c6852852677d9b95e6b0fcab70233e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 9 Nov 2023 16:08:38 -0800 Subject: [PATCH] Fix hybrid scan bug Signed-off-by: Chen Dai --- .../FlintSparkSkippingFileIndex.scala | 5 +- .../FlintSparkSkippingIndexITSuite.scala | 16 +++- .../FlintSparkSkippingIndexSqlITSuite.scala | 92 ++++++++++++++++--- 3 files changed, 94 insertions(+), 19 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala index 0fd7b9484..b53324f4c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingFileIndex.scala @@ -12,7 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.functions.isnull +import org.apache.spark.sql.functions.not import org.apache.spark.sql.types.StructType /** @@ -76,8 +76,7 @@ case class FlintSparkSkippingFileIndex( partitions .flatMap(_.files.map(f => f.getPath.toUri.toString)) .toDF(FILE_PATH_COLUMN) - .join(indexScan, Seq(FILE_PATH_COLUMN), "left") - .filter(isnull(indexScan(FILE_PATH_COLUMN)) || new Column(indexFilter)) + .join(indexScan.filter(not(new Column(indexFilter))), Seq(FILE_PATH_COLUMN), "left_anti") .select(FILE_PATH_COLUMN) .collect() .map(_.getString(0)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 90e7caecb..ac258f046 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -251,6 +251,20 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + test("should not rewrite original query if no where clause") { + val query = + s""" + | SELECT name + | FROM $testTable + |""".stripMargin + + val actual = sql(query).queryExecution.optimizedPlan + withFlintOptimizerDisabled { + val expect = sql(query).queryExecution.optimizedPlan + actual shouldBe expect + } + } + test("can build partition skipping index and rewrite applicable query") { flint .skippingIndex() @@ -394,7 +408,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } - test("should rewrite applicable query to scan latest source files if partial index") { + test("should rewrite applicable query to scan unknown source files if partial index") { flint .skippingIndex() .onTable(testTable) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 8975b7072..410def4bc 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -14,10 +14,11 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.must.Matchers.{defined, include} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.SimpleMode import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY @@ -55,21 +56,82 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } - test("create skipping index with filtering condition") { - sql(s""" - | CREATE SKIPPING INDEX ON $testTable - | ( - | year PARTITION, - | name VALUE_SET, - | age MIN_MAX - | ) - | WHERE address = 'Portland' - | WITH (auto_refresh = true) - | """.stripMargin) + test("create skipping index with auto refresh and filtering condition") { + val testTimeSeriesTable = "spark_catalog.default.partial_skipping_sql_test" + val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable) + + withTable(testTimeSeriesTable) { + createTimeSeriesTable(testTimeSeriesTable) + sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable + | ( address VALUE_SET ) + | WHERE time >= '2023-10-01 01:00:00' + | WITH (auto_refresh = true)""".stripMargin) + flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined + + // Only 2 rows indexed + val indexData = awaitStreamingDataComplete(testFlintTimeSeriesTable) + indexData.count() shouldBe 2 + + // Query without filter condition + sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 5 + + // File indexed should be included + var query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Portland' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("C"), Row("D"))) + + // File not indexed should be included too + query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Seattle' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("B"))) + + flint.deleteIndex(testFlintTimeSeriesTable) + } + } - val indexData = awaitStreamingDataComplete(testIndex) - flint.describeIndex(testIndex) shouldBe defined - indexData.count() shouldBe 1 + test("create skipping index with manual refresh and filtering condition") { + val testTimeSeriesTable = "spark_catalog.default.partial_skipping_sql_test" + val testFlintTimeSeriesTable = getSkippingIndexName(testTimeSeriesTable) + + withTable(testTimeSeriesTable) { + createTimeSeriesTable(testTimeSeriesTable) + sql(s""" CREATE SKIPPING INDEX ON $testTimeSeriesTable + | ( address VALUE_SET ) + | WHERE time >= '2023-10-01 01:00:00' AND age = 15 + | """.stripMargin) + sql(s"REFRESH SKIPPING INDEX ON $testTimeSeriesTable") + + // Only 2 rows indexed + flint.describeIndex(testFlintTimeSeriesTable) shouldBe defined + val indexData = flint.queryIndex(testFlintTimeSeriesTable) + indexData.count() shouldBe 1 + + // File not indexed should be included too + sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 5 + var query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Portland' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("C"), Row("D"))) + + // Generate new data + sql(s""" INSERT INTO $testTimeSeriesTable VALUES + | (TIMESTAMP '2023-10-01 04:00:00', 'F', 30, 'Vancouver')""".stripMargin) + + // Latest file should be included too without refresh + sql(s"SELECT * FROM $testTimeSeriesTable").count shouldBe 6 + query = sql(s""" SELECT name FROM $testTimeSeriesTable + | WHERE time > '2023-10-01 00:05:00' + | AND address = 'Vancouver' """.stripMargin) + query.queryExecution.explainString(SimpleMode) should include("FlintSparkSkippingFileIndex") + checkAnswer(query, Seq(Row("E"), Row("F"))) + + flint.deleteIndex(testFlintTimeSeriesTable) + } } test("create skipping index with streaming job options") {