diff --git a/docs/index.md b/docs/index.md index 88c2bc5e6..a307b057f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -129,7 +129,7 @@ The default maximum size for the value set is 100. In cases where a file contain ```sql CREATE SKIPPING INDEX [IF NOT EXISTS] ON -( column [, ...] ) +( column [, ...] ) WHERE WITH ( options ) @@ -142,10 +142,12 @@ DROP SKIPPING INDEX ON ::= [db_name].[schema_name].table_name ``` -Skipping index type: +Skipping index type consists of skip type name and optional parameters ```sql - ::= { PARTITION, VALUE_SET, MIN_MAX } + ::= { PARTITION, VALUE_SET, MIN_MAX } + + := ( param1, param2, ... ) ``` Example: @@ -153,7 +155,10 @@ Example: ```sql CREATE SKIPPING INDEX ON alb_logs ( - elb_status_code VALUE_SET + time PARTITION, + elb_status_code VALUE_SET, + client_ip VALUE_SET(20), + request_processing_time MIN_MAX ) WHERE time > '2023-04-01 00:00:00' diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index a5f0f993b..c88c07741 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -154,6 +154,11 @@ indexColTypeList indexColType : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX) + (LEFT_PAREN skipParams RIGHT_PAREN)? + ; + +skipParams + : propertyValue (COMMA propertyValue)* ; indexName diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 6d680ae39..6cd5b3352 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import java.util.Collections + import scala.collection.JavaConverters.mapAsScalaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata @@ -45,12 +47,16 @@ object FlintSparkIndexFactory { val skippingKind = SkippingKind.withName(getString(colInfo, "kind")) val columnName = getString(colInfo, "columnName") val columnType = getString(colInfo, "columnType") + val parameters = getSkipParams(colInfo) skippingKind match { case PARTITION => PartitionSkippingStrategy(columnName = columnName, columnType = columnType) case VALUE_SET => - ValueSetSkippingStrategy(columnName = columnName, columnType = columnType) + ValueSetSkippingStrategy( + columnName = columnName, + columnType = columnType, + params = parameters) case MIN_MAX => MinMaxSkippingStrategy(columnName = columnName, columnType = columnType) case other => @@ -78,6 +84,14 @@ object FlintSparkIndexFactory { } } + private def getSkipParams(colInfo: java.util.Map[String, AnyRef]): Map[String, String] = { + colInfo + .getOrDefault("parameters", Collections.emptyMap()) + .asInstanceOf[java.util.Map[String, String]] + .asScala + .toMap + } + private def getString(map: java.util.Map[String, AnyRef], key: String): String = { map.get(key).asInstanceOf[String] } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 2e8a3c82d..ae6518bf0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -49,6 +49,7 @@ case class FlintSparkSkippingIndex( .map(col => Map[String, AnyRef]( "kind" -> col.kind.toString, + "parameters" -> col.parameters.asJava, "columnName" -> col.columnName, "columnType" -> col.columnType).asJava) .toArray @@ -155,14 +156,20 @@ object FlintSparkSkippingIndex { * * @param colName * indexed column name + * @param params + * value set parameters * @return * index builder */ - def addValueSet(colName: String): Builder = { + def addValueSet(colName: String, params: Map[String, String] = Map.empty): Builder = { require(tableName.nonEmpty, "table name cannot be empty") val col = findColumn(colName) - addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType)) + addIndexedColumn( + ValueSetSkippingStrategy( + columnName = col.name, + columnType = col.dataType, + params = params)) this } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala index 042c968ec..2569f06fa 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala @@ -31,6 +31,11 @@ trait FlintSparkSkippingStrategy { */ val columnType: String + /** + * Skipping algorithm named parameters. + */ + val parameters: Map[String, String] = Map.empty + /** * @return * output schema mapping from Flint field name to Flint field type diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala index ff2d53d44..1db9e3d32 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.skipping.valueset import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET} -import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.functions._ @@ -18,14 +18,25 @@ import org.apache.spark.sql.functions._ case class ValueSetSkippingStrategy( override val kind: SkippingKind = VALUE_SET, override val columnName: String, - override val columnType: String) + override val columnType: String, + params: Map[String, String] = Map.empty) extends FlintSparkSkippingStrategy { + override val parameters: Map[String, String] = { + val map = Map.newBuilder[String, String] + map ++= params + + if (!params.contains(VALUE_SET_MAX_SIZE_KEY)) { + map += (VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString) + } + map.result() + } + override def outputSchema(): Map[String, String] = Map(columnName -> columnType) override def getAggregators: Seq[Expression] = { - val limit = DEFAULT_VALUE_SET_SIZE_LIMIT + val limit = parameters(VALUE_SET_MAX_SIZE_KEY).toInt val collectSet = collect_set(columnName) val aggregator = when(size(collectSet) > limit, lit(null)) @@ -48,8 +59,7 @@ case class ValueSetSkippingStrategy( object ValueSetSkippingStrategy { - /** - * Default limit for value set size collected. TODO: make this val once it's configurable - */ - var DEFAULT_VALUE_SET_SIZE_LIMIT = 100 + /** Value set max size param key and default value */ + var VALUE_SET_MAX_SIZE_KEY = "max_size" + var DEFAULT_VALUE_SET_MAX_SIZE = 100 } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 46cf7eebd..ef88fffdd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -5,12 +5,15 @@ package org.opensearch.flint.spark.sql.skipping +import scala.collection.JavaConverters.collectionAsScalaIterableConverter + import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -43,9 +46,12 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A ctx.indexColTypeList().indexColType().forEach { colTypeCtx => val colName = colTypeCtx.identifier().getText val skipType = SkippingKind.withName(colTypeCtx.skipType.getText) + val skipParams = visitSkipParams(colTypeCtx.skipParams()) skipType match { case PARTITION => indexBuilder.addPartitions(colName) - case VALUE_SET => indexBuilder.addValueSet(colName) + case VALUE_SET => + val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap + indexBuilder.addValueSet(colName, valueSetParams) case MIN_MAX => indexBuilder.addMinMax(colName) } } @@ -107,6 +113,16 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + override def visitSkipParams(ctx: SkipParamsContext): Seq[String] = { + if (ctx == null) { + Seq.empty + } else { + ctx.propertyValue.asScala + .map(p => visitPropertyValue(p)) + .toSeq + } + } + private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String = FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx)) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 247a055bf..6772eb8f3 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark.skipping +import java.util.Collections + import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.native.JsonMethods.parse @@ -39,6 +41,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("get index metadata") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.kind).thenReturn(SkippingKind.PARTITION) + when(indexCol.parameters).thenReturn(Map.empty[String, String]) when(indexCol.columnName).thenReturn("test_field") when(indexCol.columnType).thenReturn("integer") when(indexCol.outputSchema()).thenReturn(Map("test_field" -> "integer")) @@ -51,6 +54,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { metadata.indexedColumns shouldBe Array( Map( "kind" -> SkippingKind.PARTITION.toString, + "parameters" -> Collections.emptyMap(), "columnName" -> "test_field", "columnType" -> "integer").asJava) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala index bc81d9fd9..11213d011 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala @@ -6,23 +6,47 @@ package org.opensearch.flint.spark.skipping.valueset import org.opensearch.flint.spark.skipping.{FlintSparkSkippingStrategy, FlintSparkSkippingStrategySuite} -import org.scalatest.matchers.should.Matchers +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY} +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.{Abs, AttributeReference, EqualTo, Literal} -import org.apache.spark.sql.functions.{col, isnull} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType -class ValueSetSkippingStrategySuite - extends SparkFunSuite - with FlintSparkSkippingStrategySuite - with Matchers { +class ValueSetSkippingStrategySuite extends SparkFunSuite with FlintSparkSkippingStrategySuite { override val strategy: FlintSparkSkippingStrategy = ValueSetSkippingStrategy(columnName = "name", columnType = "string") private val name = AttributeReference("name", StringType, nullable = false)() + test("should return parameters with default value") { + strategy.parameters shouldBe Map( + VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString) + } + + test("should build aggregator with default parameter") { + strategy.getAggregators.head.semanticEquals( + when(size(collect_set("name")) > DEFAULT_VALUE_SET_MAX_SIZE, lit(null)) + .otherwise(collect_set("name")) + .expr) shouldBe true + } + + test("should use given parameter value") { + val strategy = + ValueSetSkippingStrategy( + columnName = "name", + columnType = "string", + params = Map(VALUE_SET_MAX_SIZE_KEY -> "5")) + + strategy.parameters shouldBe Map(VALUE_SET_MAX_SIZE_KEY -> "5") + strategy.getAggregators.head.semanticEquals( + when(size(collect_set("name")) > 5, lit(null)) + .otherwise(collect_set("name")) + .expr) shouldBe true + } + test("should rewrite EqualTo(, )") { EqualTo(name, Literal("hello")) shouldRewriteTo (isnull(col("name")) || col("name") === "hello") diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 9cb4affec..ceecb1b4e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -12,7 +12,6 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -61,21 +60,25 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "indexedColumns": [ | { | "kind": "PARTITION", + | "parameters": {}, | "columnName": "year", | "columnType": "int" | }, | { | "kind": "PARTITION", + | "parameters": {}, | "columnName": "month", | "columnType": "int" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "address", | "columnType": "string" | }, | { | "kind": "MIN_MAX", + | "parameters": {}, | "columnName": "age", | "columnType": "int" | }], @@ -274,39 +277,32 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build value set skipping index and rewrite applicable query") { - val defaultLimit = ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT - try { - ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = 2 - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("address") - .create() - flint.refreshIndex(testIndex, FULL) + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address", Map("max_size" -> "2")) + .create() + flint.refreshIndex(testIndex, FULL) - // Assert index data - checkAnswer( - flint.queryIndex(testIndex).select("address"), - Seq( - Row("""["Seattle","Portland"]"""), - Row(null) // Value set exceeded limit size is expected to be null - )) + // Assert index data + checkAnswer( + flint.queryIndex(testIndex).select("address"), + Seq( + Row("""["Seattle","Portland"]"""), + Row(null) // Value set exceeded limit size is expected to be null + )) - // Assert query rewrite that works with value set maybe null - val query = sql(s""" + // Assert query rewrite that works with value set maybe null + val query = sql(s""" | SELECT age | FROM $testTable | WHERE address = 'Portland' |""".stripMargin) - query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex( - hasIndexFilter(isnull(col("address")) || col("address") === "Portland")) - checkAnswer(query, Seq(Row(30), Row(50))) - - } finally { - ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = defaultLimit - } + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex( + hasIndexFilter(isnull(col("address")) || col("address") === "Portland")) + checkAnswer(query, Seq(Row(30), Row(50))) } test("can build min max skipping index and rewrite applicable query") { @@ -483,66 +479,79 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "indexedColumns": [ | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "boolean_col", | "columnType": "boolean" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "string_col", | "columnType": "string" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "varchar_col", | "columnType": "varchar(20)" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "char_col", | "columnType": "char(20)" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "long_col", | "columnType": "bigint" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "int_col", | "columnType": "int" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "short_col", | "columnType": "smallint" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "byte_col", | "columnType": "tinyint" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "double_col", | "columnType": "double" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "float_col", | "columnType": "float" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "timestamp_col", | "columnType": "timestamp" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "date_col", | "columnType": "date" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "struct_col", | "columnType": "struct" | }], diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 21de15de7..d072524cb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -14,7 +14,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -30,7 +30,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() - createPartitionedTable(testTable) + createPartitionedMultiRowTable(testTable) } protected override def afterEach(): Unit = { @@ -52,16 +52,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { // Wait for streaming job complete current micro batch val job = spark.streams.active.find(_.name == testIndex) - job shouldBe defined - failAfter(streamingTimeout) { - job.get.processAllAvailable() - } + awaitStreamingComplete(job.get.id.toString) val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex) flint.describeIndex(testIndex) shouldBe defined indexData.count() shouldBe 2 } + test("create skipping index with max size value set") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | address VALUE_SET(2) + | ) + | WITH (auto_refresh = true) + | """.stripMargin) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + checkAnswer( + flint.queryIndex(testIndex).select("address"), + Seq( + Row("""["Seattle","Portland"]"""), + Row(null) // Value set exceeded limit size is expected to be null + )) + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s"""