From 68b2d9711bdd09b7f239bb3dd8e98489b495a4c8 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 28 Dec 2023 11:40:04 -0800 Subject: [PATCH] Refactor IT and add user manual Signed-off-by: Chen Dai --- docs/index.md | 2 + .../FlintSparkSkippingIndexITSuite.scala | 69 +++++++++---------- .../flint/spark/FlintSparkSuite.scala | 40 +++++++++++ 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/docs/index.md b/docs/index.md index ea6778f39..cc9110fab 100644 --- a/docs/index.md +++ b/docs/index.md @@ -118,6 +118,8 @@ High level API is dependent on query engine implementation. Please see Query Eng #### Skipping Index +The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file. + ```sql CREATE SKIPPING INDEX [IF NOT EXISTS] ON diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index b46b368c9..9cb4affec 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -32,7 +32,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() - createPartitionedTable(testTable) + createPartitionedMultiRowTable(testTable) } override def afterEach(): Unit = { @@ -256,69 +256,56 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .create() flint.refreshIndex(testIndex, FULL) + // Assert index data + checkAnswer( + flint.queryIndex(testIndex).select("year", "month"), + Seq(Row(2023, 4), Row(2023, 5))) + + // Assert query rewrite val query = sql(s""" | SELECT name | FROM $testTable | WHERE year = 2023 AND month = 4 |""".stripMargin) - checkAnswer(query, Row("Hello")) + checkAnswer(query, Seq(Row("Hello"), Row("World"))) query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023 && col("month") === 4)) } - test("can build value set skipping index up to limit and rewrite applicable query") { - val testTable2 = "spark_catalog.default.value_set_test" - val testIndex2 = getSkippingIndexName(testTable2) + test("can build value set skipping index and rewrite applicable query") { val defaultLimit = ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT try { ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = 2 - createPartitionedTable(testTable2) - - // Use hint to insert all rows in a single csv file - sql(s""" - | INSERT OVERWRITE $testTable2 - | PARTITION (year=2023, month=4) - | SELECT /*+ COALESCE(1) */ * - | FROM VALUES - | ('Hello', 20, 'Seattle'), - | ('World', 30, 'Portland') - |""".stripMargin) - sql(s""" - | INSERT OVERWRITE $testTable2 - | PARTITION (year=2023, month=5) - | SELECT /*+ COALESCE(1) */ * - | FROM VALUES - | ('Hello', 30, 'Seattle'), - | ('World', 50, 'Portland'), - | ('Test', 60, 'Vancouver') - |""".stripMargin) - - // Build value set with maximum size 2 flint .skippingIndex() - .onTable(testTable2) + .onTable(testTable) .addValueSet("address") .create() - flint.refreshIndex(testIndex2, FULL) + flint.refreshIndex(testIndex, FULL) + + // Assert index data checkAnswer( - flint.queryIndex(testIndex2).select("address"), - Seq(Row("""["Seattle","Portland"]"""), Row(null))) + flint.queryIndex(testIndex).select("address"), + Seq( + Row("""["Seattle","Portland"]"""), + Row(null) // Value set exceeded limit size is expected to be null + )) - // Rewrite query and work with value set (maybe null) + // Assert query rewrite that works with value set maybe null val query = sql(s""" | SELECT age - | FROM $testTable2 + | FROM $testTable | WHERE address = 'Portland' |""".stripMargin) + query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex( hasIndexFilter(isnull(col("address")) || col("address") === "Portland")) checkAnswer(query, Seq(Row(30), Row(50))) + } finally { ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = defaultLimit - flint.deleteIndex(testIndex2) - sql(s"DROP TABLE $testTable2") } } @@ -330,16 +317,22 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .create() flint.refreshIndex(testIndex, FULL) + // Assert index data + checkAnswer( + flint.queryIndex(testIndex).select("MinMax_age_0", "MinMax_age_1"), + Seq(Row(20, 30), Row(40, 60))) + + // Assert query rewrite val query = sql(s""" | SELECT name | FROM $testTable - | WHERE age = 25 + | WHERE age = 30 |""".stripMargin) checkAnswer(query, Row("World")) query.queryExecution.executedPlan should useFlintSparkSkippingFileIndex( - hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25)) + hasIndexFilter(col("MinMax_age_0") <= 30 && col("MinMax_age_1") >= 30)) } test("should rewrite applicable query with table name without database specified") { @@ -411,7 +404,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | WHERE month = 4 |""".stripMargin) - checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver"))) + checkAnswer(query, Seq(Row("Seattle"), Row("Portland"), Row("Vancouver"))) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 29b8b95a6..211ddb57b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -85,6 +85,46 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | """.stripMargin) } + protected def createPartitionedMultiRowTable(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT, + | address STRING + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + | PARTITIONED BY ( + | year INT, + | month INT + | ) + |""".stripMargin) + + // Use hint to insert all rows in a single csv file + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | SELECT /*+ COALESCE(1) */ * + | FROM VALUES + | ('Hello', 20, 'Seattle'), + | ('World', 30, 'Portland') + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | SELECT /*+ COALESCE(1) */ * + | FROM VALUES + | ('Scala', 40, 'Seattle'), + | ('Java', 50, 'Portland'), + | ('Test', 60, 'Vancouver') + |""".stripMargin) + } + protected def createTimeSeriesTable(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable