diff --git a/docs/index.md b/docs/index.md index 375732179..5f9d594de 100644 --- a/docs/index.md +++ b/docs/index.md @@ -122,6 +122,13 @@ High level API is dependent on query engine implementation. Please see Query Eng ### SQL +- **CREATE:** Create a Flint index with the specified indexing logic. If the auto-refresh option is enabled, a background job will continually update the index with the latest data from the source. +- **REFRESH:** Manually refresh a Flint index. This command is applicable only to indexes with the auto-refresh option disabled. +- **SHOW:** Display all Flint indexes in the specified catalog or source table. +- **DESCRIBE:** Retrieve detailed information about a Flint index. +- **DROP:** Delete a Flint index logically. This action stops the refreshing process and rules it out in query rewrite. +- **VACUUM:** Physically remove all data associated with a Flint index, including index metadata and data. This operation effectively cleans up and frees resources. + #### Skipping Index The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file. @@ -129,7 +136,7 @@ The default maximum size for the value set is 100. In cases where a file contain ```sql CREATE SKIPPING INDEX [IF NOT EXISTS] ON -( column [, ...] ) +( column [, ...] ) WHERE WITH ( options ) @@ -139,13 +146,17 @@ REFRESH SKIPPING INDEX ON DROP SKIPPING INDEX ON +VACUUM SKIPPING INDEX ON + ::= [db_name].[schema_name].table_name ``` -Skipping index type: +Skipping index type consists of skip type name and optional parameters ```sql - ::= { PARTITION, VALUE_SET, MIN_MAX } + ::= { PARTITION, VALUE_SET, MIN_MAX } + + := ( param1, param2, ... ) ``` Example: @@ -153,7 +164,10 @@ Example: ```sql CREATE SKIPPING INDEX ON alb_logs ( - elb_status_code VALUE_SET + time PARTITION, + elb_status_code VALUE_SET, + client_ip VALUE_SET(20), + request_processing_time MIN_MAX ) WHERE time > '2023-04-01 00:00:00' @@ -162,6 +176,8 @@ REFRESH SKIPPING INDEX ON alb_logs DESCRIBE SKIPPING INDEX ON alb_logs DROP SKIPPING INDEX ON alb_logs + +VACUUM SKIPPING INDEX ON alb_logs ``` #### Covering Index @@ -179,6 +195,8 @@ SHOW [INDEX|INDEXES] ON [DESC|DESCRIBE] INDEX name ON DROP INDEX name ON + +VACUUM INDEX name ON ``` Example: @@ -194,6 +212,8 @@ SHOW INDEX ON alb_logs DESCRIBE INDEX elb_and_requestUri ON alb_logs DROP INDEX elb_and_requestUri ON alb_logs + +VACUUM INDEX elb_and_requestUri ON alb_logs ``` #### Materialized View @@ -210,6 +230,8 @@ SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database] [DESC|DESCRIBE] MATERIALIZED VIEW name DROP MATERIALIZED VIEW name + +VACUUM MATERIALIZED VIEW name ``` Example: @@ -230,6 +252,8 @@ SHOW MATERIALIZED VIEWS IN spark_catalog.default DESC MATERIALIZED VIEW alb_logs_metrics DROP MATERIALIZED VIEW alb_logs_metrics + +VACUUM MATERIALIZED VIEW alb_logs_metrics ``` #### Create Index Options diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index a5f0f993b..4de5bfaa6 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -96,6 +96,7 @@ materializedViewStatement | showMaterializedViewStatement | describeMaterializedViewStatement | dropMaterializedViewStatement + | vacuumMaterializedViewStatement ; createMaterializedViewStatement @@ -154,6 +155,11 @@ indexColTypeList indexColType : identifier skipType=(PARTITION | VALUE_SET | MIN_MAX) + (LEFT_PAREN skipParams RIGHT_PAREN)? + ; + +skipParams + : propertyValue (COMMA propertyValue)* ; indexName diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index aa9dae660..c197a0bd4 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -227,7 +227,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { // TODO: share same transaction for now flintIndexMonitor.stopMonitor(indexName) stopRefreshingJob(indexName) - flintClient.deleteIndex(indexName) true }) } catch { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 6d680ae39..6cd5b3352 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import java.util.Collections + import scala.collection.JavaConverters.mapAsScalaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata @@ -45,12 +47,16 @@ object FlintSparkIndexFactory { val skippingKind = SkippingKind.withName(getString(colInfo, "kind")) val columnName = getString(colInfo, "columnName") val columnType = getString(colInfo, "columnType") + val parameters = getSkipParams(colInfo) skippingKind match { case PARTITION => PartitionSkippingStrategy(columnName = columnName, columnType = columnType) case VALUE_SET => - ValueSetSkippingStrategy(columnName = columnName, columnType = columnType) + ValueSetSkippingStrategy( + columnName = columnName, + columnType = columnType, + params = parameters) case MIN_MAX => MinMaxSkippingStrategy(columnName = columnName, columnType = columnType) case other => @@ -78,6 +84,14 @@ object FlintSparkIndexFactory { } } + private def getSkipParams(colInfo: java.util.Map[String, AnyRef]): Map[String, String] = { + colInfo + .getOrDefault("parameters", Collections.emptyMap()) + .asInstanceOf[java.util.Map[String, String]] + .asScala + .toMap + } + private def getString(map: java.util.Map[String, AnyRef], key: String): String = { map.get(key).asInstanceOf[String] } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 2e8a3c82d..ae6518bf0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -49,6 +49,7 @@ case class FlintSparkSkippingIndex( .map(col => Map[String, AnyRef]( "kind" -> col.kind.toString, + "parameters" -> col.parameters.asJava, "columnName" -> col.columnName, "columnType" -> col.columnType).asJava) .toArray @@ -155,14 +156,20 @@ object FlintSparkSkippingIndex { * * @param colName * indexed column name + * @param params + * value set parameters * @return * index builder */ - def addValueSet(colName: String): Builder = { + def addValueSet(colName: String, params: Map[String, String] = Map.empty): Builder = { require(tableName.nonEmpty, "table name cannot be empty") val col = findColumn(colName) - addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType)) + addIndexedColumn( + ValueSetSkippingStrategy( + columnName = col.name, + columnType = col.dataType, + params = params)) this } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala index 042c968ec..2569f06fa 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingStrategy.scala @@ -31,6 +31,11 @@ trait FlintSparkSkippingStrategy { */ val columnType: String + /** + * Skipping algorithm named parameters. + */ + val parameters: Map[String, String] = Map.empty + /** * @return * output schema mapping from Flint field name to Flint field type diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala index ff2d53d44..1db9e3d32 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategy.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.skipping.valueset import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET} -import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.functions._ @@ -18,14 +18,25 @@ import org.apache.spark.sql.functions._ case class ValueSetSkippingStrategy( override val kind: SkippingKind = VALUE_SET, override val columnName: String, - override val columnType: String) + override val columnType: String, + params: Map[String, String] = Map.empty) extends FlintSparkSkippingStrategy { + override val parameters: Map[String, String] = { + val map = Map.newBuilder[String, String] + map ++= params + + if (!params.contains(VALUE_SET_MAX_SIZE_KEY)) { + map += (VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString) + } + map.result() + } + override def outputSchema(): Map[String, String] = Map(columnName -> columnType) override def getAggregators: Seq[Expression] = { - val limit = DEFAULT_VALUE_SET_SIZE_LIMIT + val limit = parameters(VALUE_SET_MAX_SIZE_KEY).toInt val collectSet = collect_set(columnName) val aggregator = when(size(collectSet) > limit, lit(null)) @@ -48,8 +59,7 @@ case class ValueSetSkippingStrategy( object ValueSetSkippingStrategy { - /** - * Default limit for value set size collected. TODO: make this val once it's configurable - */ - var DEFAULT_VALUE_SET_SIZE_LIMIT = 100 + /** Value set max size param key and default value */ + var VALUE_SET_MAX_SIZE_KEY = "max_size" + var DEFAULT_VALUE_SET_MAX_SIZE = 100 } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 893c2b127..73bff5cba 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -5,12 +5,15 @@ package org.opensearch.flint.spark.sql.skipping +import scala.collection.JavaConverters.collectionAsScalaIterableConverter + import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.VALUE_SET_MAX_SIZE_KEY import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -43,9 +46,12 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A ctx.indexColTypeList().indexColType().forEach { colTypeCtx => val colName = colTypeCtx.identifier().getText val skipType = SkippingKind.withName(colTypeCtx.skipType.getText) + val skipParams = visitSkipParams(colTypeCtx.skipParams()) skipType match { case PARTITION => indexBuilder.addPartitions(colName) - case VALUE_SET => indexBuilder.addValueSet(colName) + case VALUE_SET => + val valueSetParams = (Seq(VALUE_SET_MAX_SIZE_KEY) zip skipParams).toMap + indexBuilder.addValueSet(colName, valueSetParams) case MIN_MAX => indexBuilder.addMinMax(colName) } } @@ -107,6 +113,16 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + override def visitSkipParams(ctx: SkipParamsContext): Seq[String] = { + if (ctx == null) { + Seq.empty + } else { + ctx.propertyValue.asScala + .map(p => visitPropertyValue(p)) + .toSeq + } + } + private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String = FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx)) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 247a055bf..6772eb8f3 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark.skipping +import java.util.Collections + import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.native.JsonMethods.parse @@ -39,6 +41,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { test("get index metadata") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.kind).thenReturn(SkippingKind.PARTITION) + when(indexCol.parameters).thenReturn(Map.empty[String, String]) when(indexCol.columnName).thenReturn("test_field") when(indexCol.columnType).thenReturn("integer") when(indexCol.outputSchema()).thenReturn(Map("test_field" -> "integer")) @@ -51,6 +54,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { metadata.indexedColumns shouldBe Array( Map( "kind" -> SkippingKind.PARTITION.toString, + "parameters" -> Collections.emptyMap(), "columnName" -> "test_field", "columnType" -> "integer").asJava) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala index bc81d9fd9..11213d011 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/valueset/ValueSetSkippingStrategySuite.scala @@ -6,23 +6,47 @@ package org.opensearch.flint.spark.skipping.valueset import org.opensearch.flint.spark.skipping.{FlintSparkSkippingStrategy, FlintSparkSkippingStrategySuite} -import org.scalatest.matchers.should.Matchers +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY} +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.{Abs, AttributeReference, EqualTo, Literal} -import org.apache.spark.sql.functions.{col, isnull} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType -class ValueSetSkippingStrategySuite - extends SparkFunSuite - with FlintSparkSkippingStrategySuite - with Matchers { +class ValueSetSkippingStrategySuite extends SparkFunSuite with FlintSparkSkippingStrategySuite { override val strategy: FlintSparkSkippingStrategy = ValueSetSkippingStrategy(columnName = "name", columnType = "string") private val name = AttributeReference("name", StringType, nullable = false)() + test("should return parameters with default value") { + strategy.parameters shouldBe Map( + VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString) + } + + test("should build aggregator with default parameter") { + strategy.getAggregators.head.semanticEquals( + when(size(collect_set("name")) > DEFAULT_VALUE_SET_MAX_SIZE, lit(null)) + .otherwise(collect_set("name")) + .expr) shouldBe true + } + + test("should use given parameter value") { + val strategy = + ValueSetSkippingStrategy( + columnName = "name", + columnType = "string", + params = Map(VALUE_SET_MAX_SIZE_KEY -> "5")) + + strategy.parameters shouldBe Map(VALUE_SET_MAX_SIZE_KEY -> "5") + strategy.getAggregators.head.semanticEquals( + when(size(collect_set("name")) > 5, lit(null)) + .otherwise(collect_set("name")) + .expr) shouldBe true + } + test("should rewrite EqualTo(, )") { EqualTo(name, Literal("hello")) shouldRewriteTo (isnull(col("name")) || col("name") === "hello") diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index a177a9d1d..d1996359f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -30,7 +30,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { super.afterEach() // Delete all test indices - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create covering index with metadata successfully") { @@ -126,6 +126,6 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addIndexColumns("address") .create() - flint.deleteIndex(getFlintIndexName(newIndex, testTable)) + deleteTestIndex(getFlintIndexName(newIndex, testTable)) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 27419b616..450da14c9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -38,7 +38,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { super.afterEach() // Delete all test indices - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create covering index with auto refresh") { @@ -252,8 +252,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { val result = sql(s"SHOW INDEX ON $testTable") checkAnswer(result, Seq(Row(testIndex), Row("idx_address"))) - flint.deleteIndex(getFlintIndexName("idx_address", testTable)) - flint.deleteIndex(getSkippingIndexName(testTable)) + deleteTestIndex(getFlintIndexName("idx_address", testTable), getSkippingIndexName(testTable)) } test("describe covering index") { @@ -268,7 +267,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { checkAnswer(result, Seq(Row("name", "string", "indexed"), Row("age", "int", "indexed"))) } - test("drop covering index") { + test("drop and vacuum covering index") { flint .coveringIndex() .name(testIndex) @@ -277,7 +276,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { .create() sql(s"DROP INDEX $testIndex ON $testTable") - + sql(s"VACUUM INDEX $testIndex ON $testTable") flint.describeIndex(testFlintIndex) shouldBe empty } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 8df2bc472..98ce7b9b6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -29,17 +29,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers } override def afterEach(): Unit = { - - /** - * Todo, if state is not valid, will throw IllegalStateException. Should check flint - * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if - * failed, delete index itself. - */ - try { - flint.deleteIndex(testIndex) - } catch { - case _: IllegalStateException => deleteIndex(testIndex) - } + deleteTestIndex(testIndex) super.afterEach() } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala index ddbfeeb16..d9588d281 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -117,7 +117,7 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { try { test(new AssertionHelper(flintIndexName, checkpointDir)) } finally { - flint.deleteIndex(flintIndexName) + deleteTestIndex(flintIndexName) } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 5204d21cc..219e0c900 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -64,13 +64,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc FlintSparkIndexMonitor.indexMonitorTracker.clear() try { - flint.deleteIndex(testFlintIndex) - } catch { - // Index maybe end up with failed state in some test - case _: IllegalStateException => - openSearchClient - .indices() - .delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT) + deleteTestIndex(testFlintIndex) } finally { super.afterEach() } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala index 5b47edc46..32b9ab458 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala @@ -56,6 +56,7 @@ class FlintSparkIndexNameITSuite extends FlintSparkSuite { indexData should have size 1 sql(s"DROP SKIPPING INDEX ON $testTable") + sql(s"VACUUM SKIPPING INDEX ON $testTable") flint.describeIndex(flintIndexName) shouldBe empty } @@ -76,6 +77,7 @@ class FlintSparkIndexNameITSuite extends FlintSparkSuite { indexData should have size 1 sql(s"DROP INDEX $testIndex ON $testTable") + sql(s"VACUUM INDEX $testIndex ON $testTable") flint.describeIndex(flintIndexName) shouldBe empty } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index d95e1b5b1..4df6dc55b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -37,7 +37,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create materialized view with metadata successfully") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 79e49c2fd..ed702c7a1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -44,7 +44,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create materialized view with auto refresh") { @@ -255,7 +255,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq()) } - test("drop materialized view") { + test("drop and vacuum materialized view") { flint .materializedView() .name(testMvName) @@ -263,7 +263,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { .create() sql(s"DROP MATERIALIZED VIEW $testMvName") - + sql(s"VACUUM MATERIALIZED VIEW $testMvName") flint.describeIndex(testFlintIndex) shouldBe empty } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index a3bdb11f2..e4bea2013 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -11,7 +11,6 @@ import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -38,7 +37,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { super.afterEach() // Delete all test indices - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } test("create skipping index with metadata successfully") { @@ -60,21 +59,25 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "indexedColumns": [ | { | "kind": "PARTITION", + | "parameters": {}, | "columnName": "year", | "columnType": "int" | }, | { | "kind": "PARTITION", + | "parameters": {}, | "columnName": "month", | "columnType": "int" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "address", | "columnType": "string" | }, | { | "kind": "MIN_MAX", + | "parameters": {}, | "columnName": "age", | "columnType": "int" | }], @@ -256,39 +259,32 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build value set skipping index and rewrite applicable query") { - val defaultLimit = ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT - try { - ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = 2 - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("address") - .create() - flint.refreshIndex(testIndex) + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address", Map("max_size" -> "2")) + .create() + flint.refreshIndex(testIndex) - // Assert index data - checkAnswer( - flint.queryIndex(testIndex).select("address"), - Seq( - Row("""["Seattle","Portland"]"""), - Row(null) // Value set exceeded limit size is expected to be null - )) + // Assert index data + checkAnswer( + flint.queryIndex(testIndex).select("address"), + Seq( + Row("""["Seattle","Portland"]"""), + Row(null) // Value set exceeded limit size is expected to be null + )) - // Assert query rewrite that works with value set maybe null - val query = sql(s""" + // Assert query rewrite that works with value set maybe null + val query = sql(s""" | SELECT age | FROM $testTable | WHERE address = 'Portland' |""".stripMargin) - query.queryExecution.executedPlan should - useFlintSparkSkippingFileIndex( - hasIndexFilter(isnull(col("address")) || col("address") === "Portland")) - checkAnswer(query, Seq(Row(30), Row(50))) - - } finally { - ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT = defaultLimit - } + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex( + hasIndexFilter(isnull(col("address")) || col("address") === "Portland")) + checkAnswer(query, Seq(Row(30), Row(50))) } test("can build min max skipping index and rewrite applicable query") { @@ -465,66 +461,79 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "indexedColumns": [ | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "boolean_col", | "columnType": "boolean" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "string_col", | "columnType": "string" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "varchar_col", | "columnType": "varchar(20)" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "char_col", | "columnType": "char(20)" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "long_col", | "columnType": "bigint" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "int_col", | "columnType": "int" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "short_col", | "columnType": "smallint" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "byte_col", | "columnType": "tinyint" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "double_col", | "columnType": "double" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "float_col", | "columnType": "float" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "timestamp_col", | "columnType": "timestamp" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "date_col", | "columnType": "date" | }, | { | "kind": "VALUE_SET", + | "parameters": { "max_size": "100" }, | "columnName": "struct_col", | "columnType": "struct" | }], @@ -588,7 +597,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | } |""".stripMargin) - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } test("can build skipping index for varchar and char and rewrite applicable query") { @@ -632,7 +641,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { hasIndexFilter((isnull(col("varchar_col")) || col("varchar_col") === "sample varchar") && (isnull(col("char_col")) || col("char_col") === paddedChar))) - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } // Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 964dafeee..3f94762a5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -30,13 +30,13 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() - createPartitionedTable(testTable) + createPartitionedMultiRowTable(testTable) } protected override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } test("create skipping index with auto refresh") { @@ -52,16 +52,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { // Wait for streaming job complete current micro batch val job = spark.streams.active.find(_.name == testIndex) - job shouldBe defined - failAfter(streamingTimeout) { - job.get.processAllAvailable() - } + awaitStreamingComplete(job.get.id.toString) val indexData = spark.read.format(FLINT_DATASOURCE).load(testIndex) flint.describeIndex(testIndex) shouldBe defined indexData.count() shouldBe 2 } + test("create skipping index with max size value set") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | address VALUE_SET(2) + | ) + | WITH (auto_refresh = true) + | """.stripMargin) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + checkAnswer( + flint.queryIndex(testIndex).select("address"), + Seq( + Row("""["Seattle","Portland"]"""), + Row(null) // Value set exceeded limit size is expected to be null + )) + } + test("create skipping index with streaming job options") { withTempDir { checkpointDir => sql(s""" @@ -261,7 +278,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { checkAnswer(result, Seq.empty) } - test("drop skipping index") { + test("drop and vacuum skipping index") { flint .skippingIndex() .onTable(testTable) @@ -269,7 +286,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { .create() sql(s"DROP SKIPPING INDEX ON $testTable") - + sql(s"VACUUM SKIPPING INDEX ON $testTable") flint.describeIndex(testIndex) shouldBe empty } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index a5596bfe9..7af1c2639 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -12,6 +12,9 @@ import scala.concurrent.duration.TimeUnit import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.when import org.mockito.invocation.InvocationOnMock +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchSuite import org.scalatestplus.mockito.MockitoSugar.mock @@ -46,6 +49,29 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit FlintSparkIndexMonitor.executor = mockExecutor } + protected def deleteTestIndex(testIndexNames: String*): Unit = { + testIndexNames.foreach(testIndex => { + /** + * Todo, if state is not valid, will throw IllegalStateException. Should check flint + * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) + * if failed, delete index itself. + */ + try { + flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) + } catch { + case _: IllegalStateException => + if (openSearchClient + .indices() + .exists(new GetIndexRequest(testIndex), RequestOptions.DEFAULT)) { + openSearchClient + .indices() + .delete(new DeleteIndexRequest(testIndex), RequestOptions.DEFAULT) + } + } + }) + } + protected def awaitStreamingComplete(jobId: String): Unit = { val job = spark.streams.get(jobId) failAfter(streamingTimeout) { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 534d733c5..fc4e4638d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -14,8 +14,6 @@ import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.should.Matchers @@ -38,11 +36,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if * failed, delete index itself. */ - try { - flint.deleteIndex(testFlintIndex) - } catch { - case _: IllegalStateException => deleteIndex(testFlintIndex) - } + deleteTestIndex(testFlintIndex) super.afterEach() } @@ -107,27 +101,19 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime } - test("delete index") { + test("delete and vacuum index") { flint .skippingIndex() .onTable(testTable) .addPartitions("year", "month") .create() - flint.deleteIndex(testFlintIndex) + // Logical delete index + flint.deleteIndex(testFlintIndex) latestLogEntry(testLatestId) should contain("state" -> "deleted") - } - - test("vacuum index") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() - deleteLogically(testLatestId) - flint.vacuumIndex(testFlintIndex) // Both index data and metadata log should be vacuumed + flint.vacuumIndex(testFlintIndex) openSearchClient .indices() .exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false @@ -136,25 +122,6 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match RequestOptions.DEFAULT) shouldBe false } - test("should recreate index if logical deleted") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() - - // Simulate that user deletes index data manually - flint.deleteIndex(testFlintIndex) - latestLogEntry(testLatestId) should contain("state" -> "deleted") - - // Simulate that user recreate the index - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("name") - .create() - } - test("should not recreate index if index data still exists") { flint .skippingIndex() @@ -163,7 +130,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() // Simulate that PPL plugin leaves index data as logical deleted - deleteLogically(testLatestId) + flint.deleteIndex(testFlintIndex) latestLogEntry(testLatestId) should contain("state" -> "deleted") // Simulate that user recreate the index but forgot to cleanup index data @@ -175,16 +142,4 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() } should have message s"Flint index $testFlintIndex already exists" } - - private def deleteLogically(latestId: String): Unit = { - val response = openSearchClient - .get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT) - - val latest = new FlintMetadataLogEntry( - latestId, - response.getSeqNo, - response.getPrimaryTerm, - response.getSourceAsMap) - updateLatestLogEntry(latest, DELETED) - } }