Skip to content

Commit

Permalink
Refactor IT and add user manual
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 28, 2023
1 parent aafd0b0 commit 68b2d97
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 38 deletions.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ High level API is dependent on query engine implementation. Please see Query Eng

#### Skipping Index

The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file.

```sql
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
override def beforeAll(): Unit = {
super.beforeAll()

createPartitionedTable(testTable)
createPartitionedMultiRowTable(testTable)
}

override def afterEach(): Unit = {
Expand Down Expand Up @@ -256,69 +256,56 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
.create()
flint.refreshIndex(testIndex, FULL)

// Assert index data
checkAnswer(
flint.queryIndex(testIndex).select("year", "month"),
Seq(Row(2023, 4), Row(2023, 5)))

// Assert query rewrite
val query = sql(s"""
| SELECT name
| FROM $testTable
| WHERE year = 2023 AND month = 4
|""".stripMargin)

checkAnswer(query, Row("Hello"))
checkAnswer(query, Seq(Row("Hello"), Row("World")))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(hasIndexFilter(col("year") === 2023 && col("month") === 4))
}

test("can build value set skipping index up to limit and rewrite applicable query") {
val testTable2 = "spark_catalog.default.value_set_test"
val testIndex2 = getSkippingIndexName(testTable2)
test("can build value set skipping index and rewrite applicable query") {
val defaultLimit = ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT
try {
ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = 2
createPartitionedTable(testTable2)

// Use hint to insert all rows in a single csv file
sql(s"""
| INSERT OVERWRITE $testTable2
| PARTITION (year=2023, month=4)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Hello', 20, 'Seattle'),
| ('World', 30, 'Portland')
|""".stripMargin)
sql(s"""
| INSERT OVERWRITE $testTable2
| PARTITION (year=2023, month=5)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Hello', 30, 'Seattle'),
| ('World', 50, 'Portland'),
| ('Test', 60, 'Vancouver')
|""".stripMargin)

// Build value set with maximum size 2
flint
.skippingIndex()
.onTable(testTable2)
.onTable(testTable)
.addValueSet("address")
.create()
flint.refreshIndex(testIndex2, FULL)
flint.refreshIndex(testIndex, FULL)

// Assert index data
checkAnswer(
flint.queryIndex(testIndex2).select("address"),
Seq(Row("""["Seattle","Portland"]"""), Row(null)))
flint.queryIndex(testIndex).select("address"),
Seq(
Row("""["Seattle","Portland"]"""),
Row(null) // Value set exceeded limit size is expected to be null
))

// Rewrite query and work with value set (maybe null)
// Assert query rewrite that works with value set maybe null
val query = sql(s"""
| SELECT age
| FROM $testTable2
| FROM $testTable
| WHERE address = 'Portland'
|""".stripMargin)

query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(isnull(col("address")) || col("address") === "Portland"))
checkAnswer(query, Seq(Row(30), Row(50)))

} finally {
ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = defaultLimit
flint.deleteIndex(testIndex2)
sql(s"DROP TABLE $testTable2")
}
}

Expand All @@ -330,16 +317,22 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
.create()
flint.refreshIndex(testIndex, FULL)

// Assert index data
checkAnswer(
flint.queryIndex(testIndex).select("MinMax_age_0", "MinMax_age_1"),
Seq(Row(20, 30), Row(40, 60)))

// Assert query rewrite
val query = sql(s"""
| SELECT name
| FROM $testTable
| WHERE age = 25
| WHERE age = 30
|""".stripMargin)

checkAnswer(query, Row("World"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("MinMax_age_0") <= 25 && col("MinMax_age_1") >= 25))
hasIndexFilter(col("MinMax_age_0") <= 30 && col("MinMax_age_1") >= 30))
}

test("should rewrite applicable query with table name without database specified") {
Expand Down Expand Up @@ -411,7 +404,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| WHERE month = 4
|""".stripMargin)

checkAnswer(query, Seq(Row("Seattle"), Row("Vancouver")))
checkAnswer(query, Seq(Row("Seattle"), Row("Portland"), Row("Vancouver")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,46 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| """.stripMargin)
}

protected def createPartitionedMultiRowTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| name STRING,
| age INT,
| address STRING
| )
| USING CSV
| OPTIONS (
| header 'false',
| delimiter '\t'
| )
| PARTITIONED BY (
| year INT,
| month INT
| )
|""".stripMargin)

// Use hint to insert all rows in a single csv file
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=4)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Hello', 20, 'Seattle'),
| ('World', 30, 'Portland')
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Scala', 40, 'Seattle'),
| ('Java', 50, 'Portland'),
| ('Test', 60, 'Vancouver')
|""".stripMargin)
}

protected def createTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down

0 comments on commit 68b2d97

Please sign in to comment.