From bb004c346868a7bd6bbcff8d58f280c4ef421853 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 7 Mar 2024 10:50:47 -0800 Subject: [PATCH 1/4] Add bloom filter in grammar and IT Signed-off-by: Chen Dai --- .../main/antlr4/FlintSparkSqlExtensions.g4 | 2 +- .../src/main/antlr4/SparkSqlBase.g4 | 1 + .../FlintSparkSkippingIndexAstBuilder.scala | 15 ++++++++++++- .../FlintSparkSkippingIndexSqlITSuite.scala | 21 ++++++++++++++++++- 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 4ecef6a69..3c22becf5 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -183,7 +183,7 @@ indexColTypeList ; indexColType - : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX) + : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX | BLOOM_FILTER) (LEFT_PAREN skipParams RIGHT_PAREN)? ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index cb58e97e7..8f9ed570f 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -139,6 +139,7 @@ nonReserved // Flint lexical tokens +BLOOM_FILTER: 'BLOOM_FILTER'; MIN_MAX: 'MIN_MAX'; SKIPPING: 'SKIPPING'; VALUE_SET: 'VALUE_SET'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 9b638f36f..a86f6920f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -8,10 +8,11 @@ package org.opensearch.flint.spark.sql.skipping import scala.collection.JavaConverters.collectionAsScalaIterableConverter import org.antlr.v4.runtime.tree.RuleNode +import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._ import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} @@ -52,6 +53,18 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap indexBuilder.addValueSet(colName, valueSetParams) case MIN_MAX => indexBuilder.addMinMax(colName) + case BLOOM_FILTER => + val bloomFilterParamKeys = + if (skipParams.headOption.contains("false")) { + Seq( + BLOOM_FILTER_ADAPTIVE_KEY, + CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, + CLASSIC_BLOOM_FILTER_FPP_KEY) + } else { + Seq(ADAPTIVE_NUMBER_CANDIDATE_KEY, CLASSIC_BLOOM_FILTER_FPP_KEY) + } + val bloomFilterParams = (bloomFilterParamKeys zip skipParams).toMap + indexBuilder.addBloomFilter(colName, bloomFilterParams) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index ca14a555c..e8e9bacf1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -46,7 +46,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | ( | year PARTITION, | name VALUE_SET, - | age MIN_MAX + | age MIN_MAX, + | address BLOOM_FILTER | ) | WITH (auto_refresh = true) | """.stripMargin) @@ -80,6 +81,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { )) } + test("create skipping index with non-adaptive bloom filter") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | address BLOOM_FILTER(false, 1024, 0.01) + | ) + | WITH (auto_refresh = true) + | """.stripMargin) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + flint.queryIndex(testIndex).count() shouldBe 2 + + checkAnswer(sql(s"SELECT name FROM $testTable WHERE address = 'Vancouver'"), Row("Test")) + sql(s"SELECT name FROM $testTable WHERE address = 'San Francisco'").count() shouldBe 0 + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s""" From 8aa83552ddffe18b9dcd3123a11981d304cd38f2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 14 Mar 2024 09:52:32 -0700 Subject: [PATCH 2/4] Update user manual Signed-off-by: Chen Dai --- docs/index.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 31147aed4..a3045414c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -153,7 +153,20 @@ High level API is dependent on query engine implementation. Please see Query Eng #### Skipping Index -The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file. +Provided below are the explanations for the parameters of the skipping algorithm. You can find the default values in the function signature below: + +- **VALUE_SET(limit=100):** If a file contains columns with high cardinality values, the value set will become null. This trade-off prevents excessive memory consumption at the expense of not skipping the file. + +- **BLOOM_FILTER** + - **BLOOM_FILTER(num_candidate=10, fpp=0.03):** By default, the adaptive BloomFilter algorithm is used. Users can configure: + 1. The number of candidates (optional), starting with an expected number of distinct items at 1024 and doubling. + 2. The false positive probability of each candidate (optional). + 3. Examples: `BLOOM_FILTER`, `BLOOM_FILTER(20), BLOOM_FILTER(20, 0.01)` + + - **BLOOM_FILTER(false, num_items=10000, fpp=0.03):** Setting the first parameter to `false` will revert to the non-adaptive algorithm. Users can configure: + 1. The expected number of distinct values (optional). + 2. The false positive probability (optional). + 3. Examples: `BLOOM_FILTER(false)`, `BLOOM_FILTER(false, 1000000)`, `BLOOM_FILTER(false, 1000000, 0.01)` ```sql CREATE SKIPPING INDEX [IF NOT EXISTS] From 5ae966de1f89b2a28d2c0b4b78b3acf53775bb16 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 14 Mar 2024 11:16:40 -0700 Subject: [PATCH 3/4] Add more IT Signed-off-by: Chen Dai --- .../FlintSparkSkippingIndexAstBuilder.scala | 3 +- .../FlintSparkSkippingIndexSqlITSuite.scala | 30 ++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index a86f6920f..fe6356c8e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -54,8 +54,9 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A indexBuilder.addValueSet(colName, valueSetParams) case MIN_MAX => indexBuilder.addMinMax(colName) case BLOOM_FILTER => + // Determine if the given parameters are for adaptive algorithm by the first parameter val bloomFilterParamKeys = - if (skipParams.headOption.contains("false")) { + if (skipParams.headOption.exists(_.equalsIgnoreCase("false"))) { Seq( BLOOM_FILTER_ADAPTIVE_KEY, CLASSIC_BLOOM_FILTER_NUM_ITEMS_KEY, diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index e8e9bacf1..63442e6e1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -8,8 +8,9 @@ package org.opensearch.flint.spark import scala.Option.empty import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.{Formats, NoTypeHints} -import org.json4s.native.JsonMethods.parse +import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient @@ -99,6 +100,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { sql(s"SELECT name FROM $testTable WHERE address = 'San Francisco'").count() shouldBe 0 } + Seq( + ( + s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(20, 0.01))", + """ + |{ + | "adaptive": "true", + | "num_candidates": "20", + | "fpp": "0.01" + |} + |""".stripMargin), + ( + s"CREATE SKIPPING INDEX ON $testTable (address BLOOM_FILTER(false, 100000, 0.001))", + """ + |{ + | "adaptive": "false", + | "num_items": "100000", + | "fpp": "0.001" + |} + |""".stripMargin)).foreach { case (query, expectedParamJson) => + test(s"create skipping index with bloom filter parameters $expectedParamJson") { + sql(query) + val metadata = flint.describeIndex(testIndex).get.metadata().getContent + val parameters = compact(render(parse(metadata) \\ "parameters")) + parameters should matchJson(expectedParamJson) + } + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s""" From e5c12afea0f8d9d967b8ee8df521e016c1ae1169 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 14 Mar 2024 13:43:46 -0700 Subject: [PATCH 4/4] Update doc Signed-off-by: Chen Dai --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index a3045414c..dfb6c2fa1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -155,7 +155,7 @@ High level API is dependent on query engine implementation. Please see Query Eng Provided below are the explanations for the parameters of the skipping algorithm. You can find the default values in the function signature below: -- **VALUE_SET(limit=100):** If a file contains columns with high cardinality values, the value set will become null. This trade-off prevents excessive memory consumption at the expense of not skipping the file. +- **VALUE_SET(limit=100):** If the column values of a file has higher cardinality than the limit (optional, default is 100), the value set will become null. This trade-off prevents excessive memory consumption at the expense of not skipping the file. - **BLOOM_FILTER** - **BLOOM_FILTER(num_candidate=10, fpp=0.03):** By default, the adaptive BloomFilter algorithm is used. Users can configure: