diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala index 9bcd2ba9b..c1dcae893 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala @@ -27,8 +27,6 @@ case class ValueSetSkippingStrategy( override def getAggregators: Seq[Expression] = { val limit = DEFAULT_VALUE_SET_SIZE_LIMIT val collectSetLimit = collect_set(columnName) - - // IF(ARRAY_SIZE(COLLECT_SET(col)) > default_limit, null, COLLECT_SET(col) val aggregator = when(size(collectSetLimit) > limit, lit(null)) .otherwise(collectSetLimit) @@ -50,5 +48,5 @@ case class ValueSetSkippingStrategy( object ValueSetSkippingStrategy { /** Default limit for value set size collected */ - val DEFAULT_VALUE_SET_SIZE_LIMIT = 100 + val DEFAULT_VALUE_SET_SIZE_LIMIT = 2 } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 40cb5c201..18f2d853c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -12,15 +12,15 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.{Matcher, MatchResult} +import org.scalatest.matchers.{MatchResult, Matcher} import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper - import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.flint.config.FlintSparkConf._ import org.apache.spark.sql.functions.col +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { @@ -285,6 +285,40 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { useFlintSparkSkippingFileIndex(hasIndexFilter(col("address") === "Portland")) } + test("can build value set skipping index up to default limit") { + // Use hint to insert all rows in a single csv file + sql(s""" + | INSERT OVERWRITE $testTable + | PARTITION (year=2023, month=4) + | SELECT /*+ COALESCE(1) */ * + | FROM VALUES + | ('Hello', 30, 'Seattle'), + | ('World', 40, 'Portland') + |""".stripMargin) + + sql(s""" + | INSERT OVERWRITE $testTable + | PARTITION (year=2023, month=5) + | SELECT /*+ COALESCE(1) */ * + | FROM VALUES + | ('Hello', 30, 'Seattle'), + | ('World', 40, 'Portland'), + | ('Test', 50, 'Vancouver') + |""".stripMargin) + + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .create() + flint.refreshIndex(testIndex, FULL) + + checkAnswer( + flint.queryIndex(testIndex).select("address"), + Seq(Row("""["Seattle","Portland"]"""), Row(null)) + ) + } + test("can build min max skipping index and rewrite applicable query") { flint .skippingIndex()