diff --git a/docs/index.md b/docs/index.md index 74f3648d6..8fb202e0e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -153,7 +153,20 @@ High level API is dependent on query engine implementation. Please see Query Eng #### Skipping Index -The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file. +Provided below are the explanations for the parameters of the skipping algorithm. You can find the default values in the function signature below: + +- **VALUE_SET(limit=100):** If the column values of a file has higher cardinality than the limit (optional, default is 100), the value set will become null. This trade-off prevents excessive memory consumption at the expense of not skipping the file. + +- **BLOOM_FILTER** + - **BLOOM_FILTER(num_candidate=10, fpp=0.03):** By default, the adaptive BloomFilter algorithm is used. Users can configure: + 1. The number of candidates (optional), starting with an expected number of distinct items at 1024 and doubling. + 2. The false positive probability of each candidate (optional). + 3. Examples: `BLOOM_FILTER`, `BLOOM_FILTER(20), BLOOM_FILTER(20, 0.01)` + + - **BLOOM_FILTER(false, num_items=10000, fpp=0.03):** Setting the first parameter to `false` will revert to the non-adaptive algorithm. Users can configure: + 1. The expected number of distinct values (optional). + 2. The false positive probability (optional). + 3. Examples: `BLOOM_FILTER(false)`, `BLOOM_FILTER(false, 1000000)`, `BLOOM_FILTER(false, 1000000, 0.01)` ```sql CREATE SKIPPING INDEX [IF NOT EXISTS] diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 4ecef6a69..3c22becf5 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -183,7 +183,7 @@ indexColTypeList ; indexColType - : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX) + : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX | BLOOM_FILTER) (LEFT_PAREN skipParams RIGHT_PAREN)? ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index cb58e97e7..8f9ed570f 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -139,6 +139,7 @@ nonReserved // Flint lexical tokens +BLOOM_FILTER: 'BLOOM_FILTER'; MIN_MAX: 'MIN_MAX'; SKIPPING: 'SKIPPING'; VALUE_SET: 'VALUE_SET'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 9b638f36f..fe6356c8e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -8,10 +8,11 @@ package org.opensearch.flint.spark.sql.skipping import scala.collection.JavaConverters.collectionAsScalaIterableConverter import org.antlr.v4.runtime.tree.RuleNode +import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._ import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} @@ -52,6 +53,19 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap indexBuilder.addValueSet(colName, valueSetParams) case MIN_MAX => indexBuilder.addMinMax(colName) + case BLOOM_FILTER => + // Determine if the given parameters are for adaptive algorithm by the first parameter + val bloomFilterParamKeys = + if (skipParams.headOption.exists(_.equalsIgnoreCase("false"))) { + Seq( + BLOOM_FILTER_ADAPTIVE_KEY, + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, + CLASSIC_BLOOM_FILTER_FPP_KEY) + } else { + Seq(ADAPTIVE_NUMBER_CANDIDATE_KEY, CLASSIC_BLOOM_FILTER_FPP_KEY) + } + val bloomFilterParams = (bloomFilterParamKeys zip skipParams).toMap + indexBuilder.addBloomFilter(colName, bloomFilterParams) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index ca14a555c..63442e6e1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -8,8 +8,9 @@ package org.opensearch.flint.spark import scala.Option.empty import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} -import org.json4s.native.JsonMethods.parse +import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient @@ -46,7 +47,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | ( | year PARTITION, | name VALUE_SET, - | age MIN_MAX + | age MIN_MAX, + | address BLOOM_FILTER | ) | WITH (auto_refresh = true) | """.stripMargin) @@ -80,6 +82,51 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { )) } + test("create skipping index with non-adaptive bloom filter") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | address BLOOM_FILTER(false, 1024, 0.01) + | ) + | WITH (auto_refresh = true) + | """.stripMargin) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + flint.queryIndex(testIndex).count() shouldBe 2 + + checkAnswer(sql(s"SELECT name FROM $testTable WHERE address = 'Vancouver'"), Row("Test")) + sql(s"SELECT name FROM $testTable WHERE address = 'San Francisco'").count() shouldBe 0 + } + + Seq( + ( + s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(20, 0.01))", + """ + |{ + | "adaptive": "true", + | "num_candidates": "20", + | "fpp": "0.01" + |} + |""".stripMargin), + ( + s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(false, 100000, 0.001))", + """ + |{ + | "adaptive": "false", + | "num_items": "100000", + | "fpp": "0.001" + |} + |""".stripMargin)).foreach { case (query, expectedParamJson) => + test(s"create skipping index with bloom filter parameters $expectedParamJson") { + sql(query) + val metadata = flint.describeIndex(testIndex).get.metadata().getContent + val parameters = compact(render(parse(metadata) \\ "parameters")) + parameters should matchJson(expectedParamJson) + } + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s"""