Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BloomFilter skipping index SQL support #283

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,20 @@ High level API is dependent on query engine implementation. Please see Query Eng

#### Skipping Index

The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file.
Provided below are the explanations for the parameters of the skipping algorithm. You can find the default values in the function signature below:

- **VALUE_SET(limit=100):** If the column values of a file has higher cardinality than the limit (optional, default is 100), the value set will become null. This trade-off prevents excessive memory consumption at the expense of not skipping the file.

- **BLOOM_FILTER**
- **BLOOM_FILTER(num_candidate=10, fpp=0.03):** By default, the adaptive BloomFilter algorithm is used. Users can configure:
1. The number of candidates (optional), starting with an expected number of distinct items at 1024 and doubling.
2. The false positive probability of each candidate (optional).
3. Examples: `BLOOM_FILTER`, `BLOOM_FILTER(20), BLOOM_FILTER(20, 0.01)`

- **BLOOM_FILTER(false, num_items=10000, fpp=0.03):** Setting the first parameter to `false` will revert to the non-adaptive algorithm. Users can configure:
1. The expected number of distinct values (optional).
2. The false positive probability (optional).
3. Examples: `BLOOM_FILTER(false)`, `BLOOM_FILTER(false, 1000000)`, `BLOOM_FILTER(false, 1000000, 0.01)`

```sql
CREATE SKIPPING INDEX [IF NOT EXISTS]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ indexColTypeList
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX | BLOOM_FILTER)
(LEFT_PAREN skipParams RIGHT_PAREN)?
;

Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ nonReserved

// Flint lexical tokens

BLOOM_FILTER: 'BLOOM_FILTER';
MIN_MAX: 'MIN_MAX';
SKIPPING: 'SKIPPING';
VALUE_SET: 'VALUE_SET';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ package org.opensearch.flint.spark.sql.skipping
import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
Expand Down Expand Up @@ -52,6 +53,19 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap
indexBuilder.addValueSet(colName, valueSetParams)
case MIN_MAX => indexBuilder.addMinMax(colName)
case BLOOM_FILTER =>
// Determine if the given parameters are for adaptive algorithm by the first parameter
val bloomFilterParamKeys =
if (skipParams.headOption.exists(_.equalsIgnoreCase("false"))) {
Seq(
BLOOM_FILTER_ADAPTIVE_KEY,
CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY,
CLASSIC_BLOOM_FILTER_FPP_KEY)
} else {
Seq(ADAPTIVE_NUMBER_CANDIDATE_KEY, CLASSIC_BLOOM_FILTER_FPP_KEY)
}
val bloomFilterParams = (bloomFilterParamKeys zip skipParams).toMap
indexBuilder.addBloomFilter(colName, bloomFilterParams)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ package org.opensearch.flint.spark
import scala.Option.empty
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
Expand Down Expand Up @@ -46,7 +47,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
| (
| year PARTITION,
| name VALUE_SET,
| age MIN_MAX
| age MIN_MAX,
| address BLOOM_FILTER
| )
| WITH (auto_refresh = true)
| """.stripMargin)
Expand Down Expand Up @@ -80,6 +82,51 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
))
}

test("create skipping index with non-adaptive bloom filter") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (
| address BLOOM_FILTER(false, 1024, 0.01)
| )
| WITH (auto_refresh = true)
| """.stripMargin)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)

flint.queryIndex(testIndex).count() shouldBe 2

checkAnswer(sql(s"SELECT name FROM $testTable WHERE address = 'Vancouver'"), Row("Test"))
sql(s"SELECT name FROM $testTable WHERE address = 'San Francisco'").count() shouldBe 0
}

Seq(
(
s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(20, 0.01))",
"""
|{
| "adaptive": "true",
| "num_candidates": "20",
| "fpp": "0.01"
|}
|""".stripMargin),
(
s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(false, 100000, 0.001))",
"""
|{
| "adaptive": "false",
| "num_items": "100000",
| "fpp": "0.001"
|}
|""".stripMargin)).foreach { case (query, expectedParamJson) =>
test(s"create skipping index with bloom filter parameters $expectedParamJson") {
sql(query)
val metadata = flint.describeIndex(testIndex).get.metadata().getContent
val parameters = compact(render(parse(metadata) \\ "parameters"))
parameters should matchJson(expectedParamJson)
}
}

test("create skipping index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
Expand Down
Loading