Skip to content

Commit

Permalink
Add IT and change default limit to 2 temporarily for test
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 27, 2023
1 parent feb67b3 commit 657b756
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ case class ValueSetSkippingStrategy(
override def getAggregators: Seq[Expression] = {
val limit = DEFAULT_VALUE_SET_SIZE_LIMIT
val collectSetLimit = collect_set(columnName)

// IF(ARRAY_SIZE(COLLECT_SET(col)) > default_limit, null, COLLECT_SET(col)
val aggregator =
when(size(collectSetLimit) > limit, lit(null))
.otherwise(collectSetLimit)
Expand All @@ -50,5 +48,5 @@ case class ValueSetSkippingStrategy(
object ValueSetSkippingStrategy {

/** Default limit for value set size collected */
val DEFAULT_VALUE_SET_SIZE_LIMIT = 100
val DEFAULT_VALUE_SET_SIZE_LIMIT = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL}
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.{Matcher, MatchResult}
import org.scalatest.matchers.{MatchResult, Matcher}
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.functions.col
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -285,6 +285,40 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
useFlintSparkSkippingFileIndex(hasIndexFilter(col("address") === "Portland"))
}

test("can build value set skipping index up to default limit") {
// Use hint to insert all rows in a single csv file
sql(s"""
| INSERT OVERWRITE $testTable
| PARTITION (year=2023, month=4)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Hello', 30, 'Seattle'),
| ('World', 40, 'Portland')
|""".stripMargin)

sql(s"""
| INSERT OVERWRITE $testTable
| PARTITION (year=2023, month=5)
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ('Hello', 30, 'Seattle'),
| ('World', 40, 'Portland'),
| ('Test', 50, 'Vancouver')
|""".stripMargin)

flint
.skippingIndex()
.onTable(testTable)
.addValueSet("address")
.create()
flint.refreshIndex(testIndex, FULL)

checkAnswer(
flint.queryIndex(testIndex).select("address"),
Seq(Row("""["Seattle","Portland"]"""), Row(null))
)
}

test("can build min max skipping index and rewrite applicable query") {
flint
.skippingIndex()
Expand Down

0 comments on commit 657b756

Please sign in to comment.