diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index a5746b3d4..655cb4d75 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -154,7 +154,7 @@ indexColTypeList indexColType : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX) - (WITH LEFT_PAREN propertyValues RIGHT_PAREN)? + (LEFT_PAREN propertyValues RIGHT_PAREN)? ; propertyValues diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 5cdf443b0..47b7ef6c6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -13,6 +13,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -49,7 +50,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A skipType match { case PARTITION => indexBuilder.addPartitions(colName) case VALUE_SET => - indexBuilder.addValueSet(colName, (Seq("limit") zip paramValues).toMap) + indexBuilder.addValueSet(colName, (Seq(VALUE_SET_MAX_SIZE_KEY) zip paramValues).toMap) case MIN_MAX => indexBuilder.addMinMax(colName) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 21de15de7..d072524cb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -14,7 +14,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -30,7 +30,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() - createPartitionedTable(testTable) + createPartitionedMultiRowTable(testTable) } protected override def afterEach(): Unit = { @@ -52,16 +52,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { // Wait for streaming job complete current micro batch val job = spark.streams.active.find(_.name == testIndex) - job shouldBe defined - failAfter(streamingTimeout) { - job.get.processAllAvailable() - } + awaitStreamingComplete(job.get.id.toString) val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex) flint.describeIndex(testIndex) shouldBe defined indexData.count() shouldBe 2 } + test("create skipping index with max size value set") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | address VALUE_SET(2) + | ) + | WITH (auto_refresh = true) + | """.stripMargin) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + checkAnswer( + flint.queryIndex(testIndex).select("address"), + Seq( + Row("""["Seattle","Portland"]"""), + Row(null) // Value set exceeded limit size is expected to be null + )) + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s"""