Skip to content

Commit

Permalink
Add BloomFilter skipping index SQL support (#283)
Browse files Browse the repository at this point in the history
* Add bloom filter in grammar and IT

Signed-off-by: Chen Dai <[email protected]>

* Update user manual

Signed-off-by: Chen Dai <[email protected]>

* Add more IT

Signed-off-by: Chen Dai <[email protected]>

* Update doc

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Mar 18, 2024
1 parent 7382c95 commit 5a2df20
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 5 deletions.
15 changes: 14 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,20 @@ High level API is dependent on query engine implementation. Please see Query Eng

#### Skipping Index

The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file.
Provided below are the explanations for the parameters of the skipping algorithm. You can find the default values in the function signature below:

- **VALUE_SET(limit=100):** If the column values of a file has higher cardinality than the limit (optional, default is 100), the value set will become null. This trade-off prevents excessive memory consumption at the expense of not skipping the file.

- **BLOOM_FILTER**
- **BLOOM_FILTER(num_candidate=10, fpp=0.03):** By default, the adaptive BloomFilter algorithm is used. Users can configure:
1. The number of candidates (optional), starting with an expected number of distinct items at 1024 and doubling.
2. The false positive probability of each candidate (optional).
3. Examples: `BLOOM_FILTER`, `BLOOM_FILTER(20), BLOOM_FILTER(20, 0.01)`

- **BLOOM_FILTER(false, num_items=10000, fpp=0.03):** Setting the first parameter to `false` will revert to the non-adaptive algorithm. Users can configure:
1. The expected number of distinct values (optional).
2. The false positive probability (optional).
3. Examples: `BLOOM_FILTER(false)`, `BLOOM_FILTER(false, 1000000)`, `BLOOM_FILTER(false, 1000000, 0.01)`

```sql
CREATE SKIPPING INDEX [IF NOT EXISTS]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ indexColTypeList
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX | BLOOM_FILTER)
(LEFT_PAREN skipParams RIGHT_PAREN)?
;

Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ nonReserved

// Flint lexical tokens

BLOOM_FILTER: 'BLOOM_FILTER';
MIN_MAX: 'MIN_MAX';
SKIPPING: 'SKIPPING';
VALUE_SET: 'VALUE_SET';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ package org.opensearch.flint.spark.sql.skipping
import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
Expand Down Expand Up @@ -52,6 +53,19 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap
indexBuilder.addValueSet(colName, valueSetParams)
case MIN_MAX => indexBuilder.addMinMax(colName)
case BLOOM_FILTER =>
// Determine if the given parameters are for adaptive algorithm by the first parameter
val bloomFilterParamKeys =
if (skipParams.headOption.exists(_.equalsIgnoreCase("false"))) {
Seq(
BLOOM_FILTER_ADAPTIVE_KEY,
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY,
CLASSIC_BLOOM_FILTER_FPP_KEY)
} else {
Seq(ADAPTIVE_NUMBER_CANDIDATE_KEY, CLASSIC_BLOOM_FILTER_FPP_KEY)
}
val bloomFilterParams = (bloomFilterParamKeys zip skipParams).toMap
indexBuilder.addBloomFilter(colName, bloomFilterParams)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ package org.opensearch.flint.spark
import scala.Option.empty
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
Expand Down Expand Up @@ -46,7 +47,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
| (
| year PARTITION,
| name VALUE_SET,
| age MIN_MAX
| age MIN_MAX,
| address BLOOM_FILTER
| )
| WITH (auto_refresh = true)
| """.stripMargin)
Expand Down Expand Up @@ -80,6 +82,51 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
))
}

test("create skipping index with non-adaptive bloom filter") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (
| address BLOOM_FILTER(false, 1024, 0.01)
| )
| WITH (auto_refresh = true)
| """.stripMargin)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)

flint.queryIndex(testIndex).count() shouldBe 2

checkAnswer(sql(s"SELECT name FROM $testTable WHERE address = 'Vancouver'"), Row("Test"))
sql(s"SELECT name FROM $testTable WHERE address = 'San Francisco'").count() shouldBe 0
}

Seq(
(
s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(20, 0.01))",
"""
|{
| "adaptive": "true",
| "num_candidates": "20",
| "fpp": "0.01"
|}
|""".stripMargin),
(
s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(false, 100000, 0.001))",
"""
|{
| "adaptive": "false",
| "num_items": "100000",
| "fpp": "0.001"
|}
|""".stripMargin)).foreach { case (query, expectedParamJson) =>
test(s"create skipping index with bloom filter parameters $expectedParamJson") {
sql(query)
val metadata = flint.describeIndex(testIndex).get.metadata().getContent
val parameters = compact(render(parse(metadata) \\ "parameters"))
parameters should matchJson(expectedParamJson)
}
}

test("create skipping index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
Expand Down

0 comments on commit 5a2df20

Please sign in to comment.