Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure value set max size in SQL statement #210

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ The default maximum size for the value set is 100. In cases where a file contain
```sql
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
( column <index_type> [, ...] )
( column <skip_type> <skip_params> [, ...] )
WHERE <filter_predicate>
WITH ( options )

Expand All @@ -142,18 +142,23 @@ DROP SKIPPING INDEX ON <object>
<object> ::= [db_name].[schema_name].table_name
```

Skipping index type:
Skipping index type consists of skip type name and optional parameters

```sql
<index_type> ::= { PARTITION, VALUE_SET, MIN_MAX }
<skip_type> ::= { PARTITION, VALUE_SET, MIN_MAX }

<skip_params> := ( param1, param2, ... )
```

Example:

```sql
CREATE SKIPPING INDEX ON alb_logs
(
elb_status_code VALUE_SET
time PARTITION,
elb_status_code VALUE_SET,
client_ip VALUE_SET(20),
request_processing_time MIN_MAX
)
WHERE time > '2023-04-01 00:00:00'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ indexColTypeList

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
(LEFT_PAREN skipParams RIGHT_PAREN)?
;

skipParams
: propertyValue (COMMA propertyValue)*
;

indexName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import java.util.Collections

import scala.collection.JavaConverters.mapAsScalaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
Expand Down Expand Up @@ -45,12 +47,16 @@ object FlintSparkIndexFactory {
val skippingKind = SkippingKind.withName(getString(colInfo, "kind"))
val columnName = getString(colInfo, "columnName")
val columnType = getString(colInfo, "columnType")
val parameters = getSkipParams(colInfo)

skippingKind match {
case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
ValueSetSkippingStrategy(
columnName = columnName,
columnType = columnType,
params = parameters)
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
Expand Down Expand Up @@ -78,6 +84,14 @@ object FlintSparkIndexFactory {
}
}

private def getSkipParams(colInfo: java.util.Map[String, AnyRef]): Map[String, String] = {
colInfo
.getOrDefault("parameters", Collections.emptyMap())
.asInstanceOf[java.util.Map[String, String]]
.asScala
.toMap
}

private def getString(map: java.util.Map[String, AnyRef], key: String): String = {
map.get(key).asInstanceOf[String]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ case class FlintSparkSkippingIndex(
.map(col =>
Map[String, AnyRef](
"kind" -> col.kind.toString,
"parameters" -> col.parameters.asJava,
"columnName" -> col.columnName,
"columnType" -> col.columnType).asJava)
.toArray
Expand Down Expand Up @@ -155,14 +156,20 @@ object FlintSparkSkippingIndex {
*
* @param colName
* indexed column name
* @param params
* value set parameters
* @return
* index builder
*/
def addValueSet(colName: String): Builder = {
def addValueSet(colName: String, params: Map[String, String] = Map.empty): Builder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType))
addIndexedColumn(
ValueSetSkippingStrategy(
columnName = col.name,
columnType = col.dataType,
params = params))
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ trait FlintSparkSkippingStrategy {
*/
val columnType: String

/**
* Skipping algorithm named parameters.
*/
val parameters: Map[String, String] = Map.empty

/**
* @return
* output schema mapping from Flint field name to Flint field type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.functions._
Expand All @@ -18,14 +18,25 @@ import org.apache.spark.sql.functions._
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
override val columnType: String,
params: Map[String, String] = Map.empty)
extends FlintSparkSkippingStrategy {

override val parameters: Map[String, String] = {
val map = Map.newBuilder[String, String]
map ++= params

if (!params.contains(VALUE_SET_MAX_SIZE_KEY)) {
map += (VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString)
}
map.result()
}

override def outputSchema(): Map[String, String] =
Map(columnName -> columnType)

override def getAggregators: Seq[Expression] = {
val limit = DEFAULT_VALUE_SET_SIZE_LIMIT
val limit = parameters(VALUE_SET_MAX_SIZE_KEY).toInt
val collectSet = collect_set(columnName)
val aggregator =
when(size(collectSet) > limit, lit(null))
Expand All @@ -48,8 +59,7 @@ case class ValueSetSkippingStrategy(

object ValueSetSkippingStrategy {

/**
* Default limit for value set size collected. TODO: make this val once it's configurable
*/
var DEFAULT_VALUE_SET_SIZE_LIMIT = 100
/** Value set max size param key and default value */
var VALUE_SET_MAX_SIZE_KEY = "max_size"
var DEFAULT_VALUE_SET_MAX_SIZE = 100
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

package org.opensearch.flint.spark.sql.skipping

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
Expand Down Expand Up @@ -43,9 +46,12 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
ctx.indexColTypeList().indexColType().forEach { colTypeCtx =>
val colName = colTypeCtx.identifier().getText
val skipType = SkippingKind.withName(colTypeCtx.skipType.getText)
val skipParams = visitSkipParams(colTypeCtx.skipParams())
skipType match {
case PARTITION => indexBuilder.addPartitions(colName)
case VALUE_SET => indexBuilder.addValueSet(colName)
case VALUE_SET =>
val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap
indexBuilder.addValueSet(colName, valueSetParams)
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
Expand Down Expand Up @@ -107,6 +113,16 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

override def visitSkipParams(ctx: SkipParamsContext): Seq[String] = {
if (ctx == null) {
Seq.empty
} else {
ctx.propertyValue.asScala
.map(p => visitPropertyValue(p))
.toSeq
}
}

private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String =
FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark.skipping

import java.util.Collections

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.native.JsonMethods.parse
Expand Down Expand Up @@ -39,6 +41,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
test("get index metadata") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.kind).thenReturn(SkippingKind.PARTITION)
when(indexCol.parameters).thenReturn(Map.empty[String, String])
when(indexCol.columnName).thenReturn("test_field")
when(indexCol.columnType).thenReturn("integer")
when(indexCol.outputSchema()).thenReturn(Map("test_field" -> "integer"))
Expand All @@ -51,6 +54,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
metadata.indexedColumns shouldBe Array(
Map(
"kind" -> SkippingKind.PARTITION.toString,
"parameters" -> Collections.emptyMap(),
"columnName" -> "test_field",
"columnType" -> "integer").asJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,47 @@
package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.{FlintSparkSkippingStrategy, FlintSparkSkippingStrategySuite}
import org.scalatest.matchers.should.Matchers
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY}
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{Abs, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.functions.{col, isnull}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

class ValueSetSkippingStrategySuite
extends SparkFunSuite
with FlintSparkSkippingStrategySuite
with Matchers {
class ValueSetSkippingStrategySuite extends SparkFunSuite with FlintSparkSkippingStrategySuite {

override val strategy: FlintSparkSkippingStrategy =
ValueSetSkippingStrategy(columnName = "name", columnType = "string")

private val name = AttributeReference("name", StringType, nullable = false)()

test("should return parameters with default value") {
strategy.parameters shouldBe Map(
VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString)
}

test("should build aggregator with default parameter") {
strategy.getAggregators.head.semanticEquals(
when(size(collect_set("name")) > DEFAULT_VALUE_SET_MAX_SIZE, lit(null))
.otherwise(collect_set("name"))
.expr) shouldBe true
}

test("should use given parameter value") {
val strategy =
ValueSetSkippingStrategy(
columnName = "name",
columnType = "string",
params = Map(VALUE_SET_MAX_SIZE_KEY -> "5"))

strategy.parameters shouldBe Map(VALUE_SET_MAX_SIZE_KEY -> "5")
strategy.getAggregators.head.semanticEquals(
when(size(collect_set("name")) > 5, lit(null))
.otherwise(collect_set("name"))
.expr) shouldBe true
}

test("should rewrite EqualTo(<indexCol>, <value>)") {
EqualTo(name, Literal("hello")) shouldRewriteTo
(isnull(col("name")) || col("name") === "hello")
Expand Down
Loading
Loading