Skip to content

Commit

Permalink
Configure value set max size in SQL DDL
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Dec 28, 2023
1 parent 68b2d97 commit 26b105e
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ indexColTypeList

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
(WITH LEFT_PAREN propertyValues RIGHT_PAREN)?
;

propertyValues
: propertyValue (COMMA propertyValue)*
;

indexName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import java.util.Collections

import scala.collection.JavaConverters.mapAsScalaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
Expand Down Expand Up @@ -45,12 +47,16 @@ object FlintSparkIndexFactory {
val skippingKind = SkippingKind.withName(getString(colInfo, "kind"))
val columnName = getString(colInfo, "columnName")
val columnType = getString(colInfo, "columnType")
val properties = getSkippingParameters(colInfo)

skippingKind match {
case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
ValueSetSkippingStrategy(
columnName = columnName,
columnType = columnType,
params = properties)
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
Expand Down Expand Up @@ -78,6 +84,15 @@ object FlintSparkIndexFactory {
}
}

private def getSkippingParameters(
colInfo: java.util.Map[String, AnyRef]): Map[String, String] = {
colInfo
.getOrDefault("parameters", Collections.emptyMap())
.asInstanceOf[java.util.Map[String, String]]
.asScala
.toMap
}

private def getString(map: java.util.Map[String, AnyRef], key: String): String = {
map.get(key).asInstanceOf[String]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ case class FlintSparkSkippingIndex(
.map(col =>
Map[String, AnyRef](
"kind" -> col.kind.toString,
"parameters" -> col.parameters.asJava,
"columnName" -> col.columnName,
"columnType" -> col.columnType).asJava)
.toArray
Expand Down Expand Up @@ -158,11 +159,15 @@ object FlintSparkSkippingIndex {
* @return
* index builder
*/
def addValueSet(colName: String): Builder = {
def addValueSet(colName: String, params: Map[String, String] = Map.empty): Builder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType))
addIndexedColumn(
ValueSetSkippingStrategy(
columnName = col.name,
columnType = col.dataType,
params = params))
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ trait FlintSparkSkippingStrategy {
*/
val columnType: String

/**
* Skipping algorithm parameters.
*/
val parameters: Map[String, String] = Map.empty

/**
* @return
* output schema mapping from Flint field name to Flint field type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.functions._
Expand All @@ -18,14 +18,25 @@ import org.apache.spark.sql.functions._
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
override val columnType: String,
params: Map[String, String] = Map.empty)
extends FlintSparkSkippingStrategy {

override val parameters: Map[String, String] = {
val map = Map.newBuilder[String, String]
map ++= params

if (!params.contains(VALUE_SET_MAX_SIZE_KEY)) {
map += (VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString)
}
map.result()
}

override def outputSchema(): Map[String, String] =
Map(columnName -> columnType)

override def getAggregators: Seq[Expression] = {
val limit = DEFAULT_VALUE_SET_SIZE_LIMIT
val limit = parameters(VALUE_SET_MAX_SIZE_KEY).toInt
val collectSet = collect_set(columnName)
val aggregator =
when(size(collectSet) > limit, lit(null))
Expand All @@ -48,8 +59,7 @@ case class ValueSetSkippingStrategy(

object ValueSetSkippingStrategy {

/**
* Default limit for value set size collected. TODO: make this val once it's configurable
*/
var DEFAULT_VALUE_SET_SIZE_LIMIT = 100
/** Value set max size param key and default value */
var VALUE_SET_MAX_SIZE_KEY = "max_size"
var DEFAULT_VALUE_SET_MAX_SIZE = 100
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark.sql.skipping

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
Expand Down Expand Up @@ -43,9 +45,11 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
val paramValues = visitPropertyValues(colTypeCtx.propertyValues())
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case VALUE_SET =>
indexBuilder.addValueSet(colName, (Seq("limit") zip paramValues).toMap)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
Expand Down Expand Up @@ -98,6 +102,16 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
Seq.empty
}

override def visitPropertyValues(ctx: PropertyValuesContext): Seq[String] = {
if (ctx == null) {
Seq.empty
} else {
ctx.propertyValue.asScala
.map(p => visitPropertyValue(p))
.toSeq
}
}

private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String =
FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.{FlintSparkSkippingStrategy, FlintSparkSkippingStrategySuite}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
Expand All @@ -23,6 +24,11 @@ class ValueSetSkippingStrategySuite

private val name = AttributeReference("name", StringType, nullable = false)()

test("should return properties with default value") {
strategy.parameters shouldBe Map(
VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString)
}

test("should rewrite EqualTo(<indexCol>, <value>)") {
EqualTo(name, Literal("hello")) shouldRewriteTo
(isnull(col("name")) || col("name") === "hello")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL}
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy
import org.scalatest.matchers.{Matcher, MatchResult}
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand Down Expand Up @@ -61,21 +60,25 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "indexedColumns": [
| {
| "kind": "PARTITION",
| "parameters": {},
| "columnName": "year",
| "columnType": "int"
| },
| {
| "kind": "PARTITION",
| "parameters": {},
| "columnName": "month",
| "columnType": "int"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "address",
| "columnType": "string"
| },
| {
| "kind": "MIN_MAX",
| "parameters": {},
| "columnName": "age",
| "columnType": "int"
| }],
Expand Down Expand Up @@ -274,39 +277,32 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build value set skipping index and rewrite applicable query") {
val defaultLimit = ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT
try {
ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = 2
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("address")
.create()
flint.refreshIndex(testIndex, FULL)
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("address", Map("max_size" -> "2"))
.create()
flint.refreshIndex(testIndex, FULL)

// Assert index data
checkAnswer(
flint.queryIndex(testIndex).select("address"),
Seq(
Row("""["Seattle","Portland"]"""),
Row(null) // Value set exceeded limit size is expected to be null
))
// Assert index data
checkAnswer(
flint.queryIndex(testIndex).select("address"),
Seq(
Row("""["Seattle","Portland"]"""),
Row(null) // Value set exceeded limit size is expected to be null
))

// Assert query rewrite that works with value set maybe null
val query = sql(s"""
// Assert query rewrite that works with value set maybe null
val query = sql(s"""
| SELECT age
| FROM $testTable
| WHERE address = 'Portland'
|""".stripMargin)

query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(isnull(col("address")) || col("address") === "Portland"))
checkAnswer(query, Seq(Row(30), Row(50)))

} finally {
ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = defaultLimit
}
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(isnull(col("address")) || col("address") === "Portland"))
checkAnswer(query, Seq(Row(30), Row(50)))
}

test("can build min max skipping index and rewrite applicable query") {
Expand Down Expand Up @@ -483,66 +479,79 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "indexedColumns": [
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "boolean_col",
| "columnType": "boolean"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "string_col",
| "columnType": "string"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "varchar_col",
| "columnType": "varchar(20)"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "char_col",
| "columnType": "char(20)"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "long_col",
| "columnType": "bigint"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "int_col",
| "columnType": "int"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "short_col",
| "columnType": "smallint"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "byte_col",
| "columnType": "tinyint"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "double_col",
| "columnType": "double"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "float_col",
| "columnType": "float"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "timestamp_col",
| "columnType": "timestamp"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "date_col",
| "columnType": "date"
| },
| {
| "kind": "VALUE_SET",
| "parameters": { "max_size": "100" },
| "columnName": "struct_col",
| "columnType": "struct<subfield1:string,subfield2:int>"
| }],
Expand Down

0 comments on commit 26b105e

Please sign in to comment.