diff --git a/docs/index.md b/docs/index.md index 3b1e2efe5..713186512 100644 --- a/docs/index.md +++ b/docs/index.md @@ -178,6 +178,10 @@ WITH ( options ) REFRESH SKIPPING INDEX ON [DESC|DESCRIBE] SKIPPING INDEX ON + +ALTER SKIPPING INDEX +ON +WITH ( options ) DROP SKIPPING INDEX ON @@ -212,6 +216,9 @@ REFRESH SKIPPING INDEX ON alb_logs DESCRIBE SKIPPING INDEX ON alb_logs +ALTER SKIPPING INDEX ON alb_logs +WITH ( auto_refresh = false ) + DROP SKIPPING INDEX ON alb_logs VACUUM SKIPPING INDEX ON alb_logs @@ -231,6 +238,9 @@ SHOW [INDEX|INDEXES] ON [DESC|DESCRIBE] INDEX name ON +ALTER INDEX name ON +WITH ( options ) + DROP INDEX name ON VACUUM INDEX name ON @@ -248,6 +258,9 @@ SHOW INDEX ON alb_logs DESCRIBE INDEX elb_and_requestUri ON alb_logs +ALTER INDEX elb_and_requestUri ON alb_logs +WITH ( auto_refresh = false ) + DROP INDEX elb_and_requestUri ON alb_logs VACUUM INDEX elb_and_requestUri ON alb_logs @@ -266,6 +279,9 @@ SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database] [DESC|DESCRIBE] MATERIALIZED VIEW name +ALTER MATERIALIZED VIEW name +WITH ( options ) + DROP MATERIALIZED VIEW name VACUUM MATERIALIZED VIEW name @@ -288,6 +304,9 @@ SHOW MATERIALIZED VIEWS IN spark_catalog.default DESC MATERIALIZED VIEW alb_logs_metrics +ALTER MATERIALIZED VIEW alb_logs_metrics +WITH ( auto_refresh = false ) + DROP MATERIALIZED VIEW alb_logs_metrics VACUUM MATERIALIZED VIEW alb_logs_metrics @@ -354,7 +373,7 @@ User can provide the following options in `WITH` clause of create statement: + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. + `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled. + `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. -+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. ++ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' @@ -385,6 +404,15 @@ WITH ( ) ``` +#### Alter Index Options + +User can provide the following options in `WITH` clause of alter statement: ++ `auto_refresh`: This is required for alter statement. Currently, we restrict that an alter statement must change the auto refresh option from its original value. ++ `refresh_interval` ++ `incremental_refresh` ++ `checkpoint_location` ++ `watermark_delay` + ### Index Job Management Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below. diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index 23205fe99..12a5646f3 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -25,6 +25,7 @@ import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; import org.opensearch.flint.core.metrics.MetricsUtil; @@ -44,6 +45,8 @@ public interface IRestHighLevelClient extends Closeable { CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException; + void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException; + void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException; DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException; diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index bf48af52d..fa5696f50 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -27,6 +27,7 @@ import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.client.indices.PutMappingRequest; import java.io.IOException; @@ -63,6 +64,11 @@ public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, Re return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options)); } + @Override + public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException { + execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options)); + } + @Override public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException { execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options)); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index a089e4088..ee38bbb9c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -71,6 +71,14 @@ OptimisticTransaction startTransaction(String indexName, String dataSourc */ FlintMetadata getIndexMetadata(String indexName); + /** + * Update a Flint index with the metadata given. + * + * @param indexName index name + * @param metadata index metadata + */ + void updateIndex(String indexName, FlintMetadata metadata); + /** * Delete a Flint index. * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index e6ae565d2..5f229d412 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -81,6 +81,7 @@ object FlintMetadataLogEntry { val CREATING: IndexState.Value = Value("creating") val ACTIVE: IndexState.Value = Value("active") val REFRESHING: IndexState.Value = Value("refreshing") + val UPDATING: IndexState.Value = Value("updating") val DELETING: IndexState.Value = Value("deleting") val DELETED: IndexState.Value = Value("deleted") val FAILED: IndexState.Value = Value("failed") diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index da5877262..1c15af357 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -36,6 +36,7 @@ import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; @@ -191,6 +192,19 @@ public FlintMetadata getIndexMetadata(String indexName) { } } + @Override + public void updateIndex(String indexName, FlintMetadata metadata) { + LOG.info("Updating Flint index " + indexName + " with metadata " + metadata); + String osIndexName = sanitizeIndexName(indexName); + try (IRestHighLevelClient client = createClient()) { + PutMappingRequest request = new PutMappingRequest(osIndexName); + request.source(metadata.getContent(), XContentType.JSON); + client.updateIndexMapping(request, RequestOptions.DEFAULT); + } catch (Exception e) { + throw new IllegalStateException("Failed to update Flint index " + osIndexName, e); + } + } + @Override public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index acb416bfe..7b875f63b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -13,10 +13,11 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh -import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy @@ -202,6 +203,39 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } + /** + * Update the given index with metadata and update associated job. + * + * @param index + * Flint index to update + * @param updateMode + * update mode + * @return + * refreshing job ID (empty if no job) + */ + def updateIndex(index: FlintSparkIndex): Option[String] = { + logInfo(s"Updating Flint index $index") + val indexName = index.name + + validateUpdateAllowed( + describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + .options, + index.options) + + try { + // Relies on validation to forbid auto-to-auto and manual-to-manual updates + index.options.autoRefresh() match { + case true => updateIndexManualToAuto(index) + case false => updateIndexAutoToManual(index) + } + } catch { + case e: Exception => + logError("Failed to update Flint index", e) + throw new IllegalStateException("Failed to update Flint index") + } + } + /** * Delete index and refreshing job associated. * @@ -353,4 +387,90 @@ class FlintSpark(val spark: SparkSession) extends Logging { logWarning("Refreshing job not found") } } + + /** + * Validate the index update options are allowed. + * @param originalOptions + * original options + * @param updatedOptions + * the updated options + */ + private def validateUpdateAllowed( + originalOptions: FlintSparkIndexOptions, + updatedOptions: FlintSparkIndexOptions): Unit = { + // auto_refresh must change + if (updatedOptions.autoRefresh() == originalOptions.autoRefresh()) { + throw new IllegalArgumentException("auto_refresh option must be updated") + } + + val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match { + case (true, false) => AUTO + case (false, false) => FULL + case (false, true) => INCREMENTAL + case (true, true) => + throw new IllegalArgumentException( + "auto_refresh and incremental_refresh options cannot both be true") + } + + // validate allowed options depending on refresh mode + val allowedOptionNames = refreshMode match { + case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH) + case AUTO | INCREMENTAL => + Set( + AUTO_REFRESH, + INCREMENTAL_REFRESH, + REFRESH_INTERVAL, + CHECKPOINT_LOCATION, + WATERMARK_DELAY) + } + + // Get the changed option names + val updateOptionNames = updatedOptions.options.filterNot { case (k, v) => + originalOptions.options.get(k).contains(v) + }.keys + if (!updateOptionNames.forall(allowedOptionNames.map(_.toString).contains)) { + throw new IllegalArgumentException( + s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptionNames}") + } + } + + private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = { + val indexName = index.name + val indexLogEntry = index.latestLogEntry.get + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => + latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) + .transientLog(latest => latest.copy(state = UPDATING)) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => { + flintClient.updateIndex(indexName, index.metadata) + logInfo("Update index options complete") + flintIndexMonitor.stopMonitor(indexName) + stopRefreshingJob(indexName) + None + }) + } + + private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = { + val indexName = index.name + val indexLogEntry = index.latestLogEntry.get + val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => + latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) + .transientLog(latest => + latest.copy(state = UPDATING, createTime = System.currentTimeMillis())) + .finalLog(latest => { + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(indexName) + latest.copy(state = REFRESHING) + }) + .commit(_ => { + flintClient.updateIndex(indexName, index.metadata) + logInfo("Update index options complete") + indexRefresh.start(spark, flintSparkConf) + }) + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 0b2a84519..adcb4c45f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import scala.collection.JavaConverters.mapAsJavaMapConverter + import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.apache.spark.sql.catalog.Column @@ -59,6 +61,28 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { def create(ignoreIfExists: Boolean = false): Unit = flint.createIndex(buildIndex(), ignoreIfExists) + /** + * Copy Flint index with updated options. + * + * @param index + * Flint index to copy + * @param updateOptions + * options to update + * @return + * updated Flint index + */ + def copyWithUpdate( + index: FlintSparkIndex, + updateOptions: FlintSparkIndexOptions): FlintSparkIndex = { + val originalOptions = index.options + val updatedOptions = + originalOptions.copy(options = originalOptions.options ++ updateOptions.options) + val updatedMetadata = index + .metadata() + .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) + FlintSparkIndexFactory.create(updatedMetadata).get + } + /** * Build method for concrete builder class to implement */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 31d1d91f3..74626d25d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -96,7 +96,7 @@ case class FlintSparkMaterializedView( private def watermark(timeCol: Attribute, child: LogicalPlan) = { require( options.watermarkDelay().isDefined, - "watermark delay is required for incremental refresh with aggregation") + "watermark delay is required for auto refresh and incremental refresh with aggregation") val delay = options.watermarkDelay().get EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 14fa21240..72237406b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -102,6 +102,20 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + override def visitAlterCoveringIndexStatement( + ctx: AlterCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + val indexOptions = visitPropertyList(ctx.propertyList()) + val index = flint + .describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + val updatedIndex = flint.coveringIndex().copyWithUpdate(index, indexOptions) + flint.updateIndex(updatedIndex) + Seq.empty + } + } + override def visitDropCoveringIndexStatement( ctx: DropCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 5b31890bb..65ee45577 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -99,6 +99,20 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } } + override def visitAlterMaterializedViewStatement( + ctx: AlterMaterializedViewStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = getFlintIndexName(flint, ctx.mvName) + val indexOptions = visitPropertyList(ctx.propertyList()) + val index = flint + .describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + val updatedIndex = flint.materializedView().copyWithUpdate(index, indexOptions) + flint.updateIndex(updatedIndex) + Seq.empty + } + } + override def visitDropMaterializedViewStatement( ctx: DropMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index e98446c22..367db9d3f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -110,6 +110,20 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + override def visitAlterSkippingIndexStatement( + ctx: AlterSkippingIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = getSkippingIndexName(flint, ctx.tableName) + val indexOptions = visitPropertyList(ctx.propertyList()) + val index = flint + .describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + val updatedIndex = flint.skippingIndex().copyWithUpdate(index, indexOptions) + flint.updateIndex(updatedIndex) + Seq.empty + } + } + override def visitAnalyzeSkippingIndexStatement( ctx: AnalyzeSkippingIndexStatementContext): Command = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 85be9bbb8..6eab292e2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -77,6 +77,50 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + it should "update index successfully" in { + val indexName = "test_update" + val content = + """ { + | "_meta": { + | "kind": "test_kind" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn(content) + when(metadata.indexSettings).thenReturn(None) + flintClient.createIndex(indexName, metadata) + + val newContent = + """ { + | "_meta": { + | "kind": "test_kind", + | "name": "test_name" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + + val newMetadata = mock[FlintMetadata] + when(newMetadata.getContent).thenReturn(newContent) + when(newMetadata.indexSettings).thenReturn(None) + flintClient.updateIndex(indexName, newMetadata) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" + flintClient.getIndexMetadata(indexName).name shouldBe "test_name" + } + it should "get all index metadata with the given index name pattern" in { val metadata = mock[FlintMetadata] when(metadata.getContent).thenReturn("{}") diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 38355c2f6..99a73e627 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -118,6 +118,34 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) } + test("update covering index successfully") { + // Create full refresh Flint index + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + val indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq()) + + // Update Flint index to auto refresh and wait for complete + val updatedIndex = flint + .coveringIndex() + .copyWithUpdate( + flint.describeIndex(testFlintIndex).get, + FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + } + test("can have multiple covering indexes on a table") { flint .coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 3c9e06257..6991e60d8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -28,8 +28,8 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { private val testIndex = "name_and_age" private val testFlintIndex = getFlintIndexName(testIndex, testTable) - override def beforeAll(): Unit = { - super.beforeAll() + override def beforeEach(): Unit = { + super.beforeEach() createPartitionedTable(testTable) } @@ -39,6 +39,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { // Delete all test indices deleteTestIndex(testFlintIndex) + sql(s"DROP TABLE $testTable") } test("create covering index with auto refresh") { @@ -296,6 +297,28 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { checkAnswer(result, Seq(Row("name", "string", "indexed"), Row("age", "int", "indexed"))) } + test("update covering index") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + flint.describeIndex(testFlintIndex) shouldBe defined + flint.queryIndex(testFlintIndex).count() shouldBe 0 + + sql(s""" + | ALTER INDEX $testIndex ON $testTable + | WITH (auto_refresh = true) + | """.stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + awaitStreamingComplete(job.get.id.toString) + flint.queryIndex(testFlintIndex).count() shouldBe 2 + } + test("drop and vacuum covering index") { flint .coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 7ea8d381e..16d2b0b07 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -191,6 +191,49 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } + test("update materialized view successfully") { + withTempDir { checkpointDir => + // Create full refresh Flint index + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .create() + val indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq()) + + // Update Flint index to auto refresh and wait for complete + val updatedIndex = flint + .materializedView() + .copyWithUpdate( + flint.describeIndex(testFlintIndex).get, + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "1 Minute"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + checkAnswer( + indexData.select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 00:00:00"), 1), + Row(timestamp("2023-10-01 00:10:00"), 2), + Row(timestamp("2023-10-01 01:00:00"), 1) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 02:00:00"), 1) + */ + )) + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) private def withIncrementalMaterializedView(query: String)( diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 20b7f3d55..906523696 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -37,7 +37,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | GROUP BY TUMBLE(time, '10 Minutes') |""".stripMargin - override def beforeAll(): Unit = { + override def beforeEach(): Unit = { super.beforeAll() createTimeSeriesTable(testTable) } @@ -45,6 +45,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { override def afterEach(): Unit = { super.afterEach() deleteTestIndex(testFlintIndex) + sql(s"DROP TABLE $testTable") } test("create materialized view with auto refresh") { @@ -284,6 +285,47 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq()) } + test("update materialized view") { + withTempDir { checkpointDir => + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .create() + + flint.describeIndex(testFlintIndex) shouldBe defined + flint.queryIndex(testFlintIndex).count() shouldBe 0 + + sql(s""" + | ALTER MATERIALIZED VIEW $testMvName + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + checkAnswer( + flint.queryIndex(testFlintIndex).select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 00:00:00"), 1), + Row(timestamp("2023-10-01 00:10:00"), 2), + Row(timestamp("2023-10-01 01:00:00"), 1) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 02:00:00"), 1) + */ + )) + } + } + test("drop and vacuum materialized view") { flint .materializedView() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index b663b19bd..8b724fde7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -266,6 +266,34 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { indexData should have size 2 } + test("update skipping index successfully") { + // Create full refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + flint.queryIndex(testIndex).collect().toSet should have size 0 + + // Update Flint index to auto refresh and wait for complete + val updatedIndex = + flint + .skippingIndex() + .copyWithUpdate( + flint.describeIndex(testIndex).get, + FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + flint.queryIndex(testIndex).collect().toSet should have size 2 + } + test("can have only 1 skipping index on a table") { flint .skippingIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index b08945953..53d08bda7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -354,6 +354,141 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { checkAnswer(result, Seq.empty) } + test("update full refresh skipping index to auto refresh") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( + | year PARTITION, + | name VALUE_SET, + | age MIN_MAX + | ) + | """.stripMargin) + + flint.describeIndex(testIndex) shouldBe defined + flint.queryIndex(testIndex).count() shouldBe 0 + + sql(s""" + | ALTER SKIPPING INDEX ON $testTable + | WITH (auto_refresh = true) + | """.stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + flint.queryIndex(testIndex).count() shouldBe 2 + } + + test("update incremental refresh skipping index to auto refresh") { + withTempDir { checkpointDir => + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | incremental_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 2 + + // New data will be refreshed after updating index to auto refresh + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + + sql(s""" + | ALTER SKIPPING INDEX ON $testTable + | WITH ( + | auto_refresh = true, + | incremental_refresh = false + | ) + | """.stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + flint.queryIndex(testIndex).count() shouldBe 3 + } + } + + test("update auto refresh skipping index to full refresh") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH (auto_refresh = true) + | """.stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + flint.describeIndex(testIndex) shouldBe defined + flint.queryIndex(testIndex).count() shouldBe 2 + + sql(s""" + | ALTER SKIPPING INDEX ON $testTable + | WITH (auto_refresh = false) + | """.stripMargin) + + spark.streams.active.find(_.name == testIndex) shouldBe empty + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + flint.queryIndex(testIndex).count() shouldBe 2 + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 3 + } + + test("update auto refresh skipping index to incremental refresh") { + withTempDir { checkpointDir => + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + flint.describeIndex(testIndex) shouldBe defined + flint.queryIndex(testIndex).count() shouldBe 2 + + sql(s""" + | ALTER SKIPPING INDEX ON $testTable + | WITH ( + | auto_refresh = false, + | incremental_refresh = true + | ) + | """.stripMargin) + + spark.streams.active.find(_.name == testIndex) shouldBe empty + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + flint.queryIndex(testIndex).count() shouldBe 2 + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 3 + } + } + test("drop and vacuum skipping index") { flint .skippingIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index b27275539..7ad03f84a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -121,6 +121,41 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime } + test("update full refresh index to auto refresh index") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + val index = flint.describeIndex(testFlintIndex).get + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + flint.updateIndex(updatedIndex) + val latest = latestLogEntry(testLatestId) + latest should contain("state" -> "refreshing") + latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L + } + + test("update auto refresh index to full refresh index") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testFlintIndex) + + val index = flint.describeIndex(testFlintIndex).get + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))) + flint.updateIndex(updatedIndex) + val latest = latestLogEntry(testLatestId) + latest should contain("state" -> "active") + } + test("delete and vacuum index") { flint .skippingIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala new file mode 100644 index 000000000..c5ac0ab95 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -0,0 +1,523 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.native.JsonMethods._ +import org.opensearch.client.RequestOptions +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.DeleteByQueryRequest +import org.scalatest.matchers.must.Matchers._ +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.test" + private val testIndex = getSkippingIndexName(testTable) + + override def beforeEach(): Unit = { + super.beforeEach() + createPartitionedMultiRowTable(testTable) + } + + override def afterEach(): Unit = { + super.afterEach() + + // Delete all test indices + deleteTestIndex(testIndex) + sql(s"DROP TABLE $testTable") + } + + test("update index with index options successfully") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options(FlintSparkIndexOptions(Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) + .create() + val indexInitial = flint.describeIndex(testIndex).get + + // Update index options + val updateOptions = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "incremental_refresh" -> "false")) + val updatedIndex = flint.skippingIndex().copyWithUpdate(indexInitial, updateOptions) + flint.updateIndex(updatedIndex) + + // Verify index after update + val indexFinal = flint.describeIndex(testIndex).get + val optionJson = + compact(render(parse(indexFinal.metadata().getContent) \ "_meta" \ "options")) + optionJson should matchJson(s""" + | { + | "auto_refresh": "true", + | "incremental_refresh": "false", + | "refresh_interval": "1 Minute", + | "checkpoint_location": "${checkpointDir.getAbsolutePath}", + | "index_settings": "{\\\"number_of_shards\\\": 3,\\\"number_of_replicas\\\": 2}" + | } + |""".stripMargin) + + // Load index options from index mapping (verify OS index setting in SQL IT) + indexFinal.options.autoRefresh() shouldBe true + indexFinal.options.incrementalRefresh() shouldBe false + indexFinal.options.refreshInterval() shouldBe Some("1 Minute") + indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + indexFinal.options.indexSettings() shouldBe + Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}") + } + } + + // Test update options validation failure + Seq( + ( + "update index without changing auto_refresh option", + Seq( + (Map("auto_refresh" -> "true"), Map("auto_refresh" -> "true")), + ( + Map("auto_refresh" -> "true"), + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/")), + (Map("auto_refresh" -> "true"), Map("checkpoint_location" -> "s3a://test/")), + (Map("auto_refresh" -> "true"), Map("watermark_delay" -> "1 Minute")), + (Map.empty[String, String], Map("auto_refresh" -> "false")), + ( + Map.empty[String, String], + Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")), + (Map.empty[String, String], Map("incremental_refresh" -> "true")), + (Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/")))), + ( + "convert to full refresh with disallowed options", + Seq( + ( + Map("auto_refresh" -> "true"), + Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")), + ( + Map("auto_refresh" -> "true"), + Map("auto_refresh" -> "false", "refresh_interval" -> "5 Minute")), + ( + Map("auto_refresh" -> "true"), + Map("auto_refresh" -> "false", "watermark_delay" -> "1 Minute")))), + ( + "convert to incremental refresh with disallowed options", + Seq( + ( + Map("auto_refresh" -> "true"), + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "output_mode" -> "complete")))), + ( + "convert to auto refresh with disallowed options", + Seq( + (Map.empty[String, String], Map("auto_refresh" -> "true", "output_mode" -> "complete")))), + ( + "convert to invalid refresh mode", + Seq( + ( + Map.empty[String, String], + Map("auto_refresh" -> "true", "incremental_refresh" -> "true")), + (Map("auto_refresh" -> "true"), Map("incremental_refresh" -> "true")), + ( + Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("auto_refresh" -> "true"))))).foreach { case (testName, testCases) => + test(s"should fail if $testName") { + testCases.foreach { case (initialOptionsMap, updateOptionsMap) => + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions(initialOptionsMap + .get("checkpoint_location") + .map(_ => + initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(initialOptionsMap))) + .create() + flint.refreshIndex(testIndex) + + val index = flint.describeIndex(testIndex).get + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + index, + FlintSparkIndexOptions( + updateOptionsMap + .get("checkpoint_location") + .map(_ => + updateOptionsMap + .updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(updateOptionsMap))) + flint.updateIndex(updatedIndex) + } + + deleteTestIndex(testIndex) + } + } + } + } + + // Test update options validation success + Seq( + ( + "convert to full refresh with allowed options", + Seq( + ( + Map("auto_refresh" -> "true"), + Map("auto_refresh" -> "false"), + Map( + "auto_refresh" -> false, + "incremental_refresh" -> false, + "refresh_interval" -> None, + "checkpoint_location" -> None, + "watermark_delay" -> None)), + ( + Map("auto_refresh" -> "true"), + Map("auto_refresh" -> "false", "incremental_refresh" -> "false"), + Map( + "auto_refresh" -> false, + "incremental_refresh" -> false, + "refresh_interval" -> None, + "checkpoint_location" -> None, + "watermark_delay" -> None)))), + ( + "convert to incremental refresh with allowed options", + Seq( + ( + Map("auto_refresh" -> "true"), + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "refresh_interval" -> "1 Minute"), + Map( + "auto_refresh" -> false, + "incremental_refresh" -> true, + "refresh_interval" -> Some("1 Minute"), + "checkpoint_location" -> None, + "watermark_delay" -> None)), + ( + Map("auto_refresh" -> "true"), + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "checkpoint_location" -> "s3a://test/"), + Map( + "auto_refresh" -> false, + "incremental_refresh" -> true, + "refresh_interval" -> None, + "checkpoint_location" -> Some("s3a://test/"), + "watermark_delay" -> None)), + ( + Map("auto_refresh" -> "true"), + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "watermark_delay" -> "1 Minute"), + Map( + "auto_refresh" -> false, + "incremental_refresh" -> true, + "refresh_interval" -> None, + "checkpoint_location" -> None, + "watermark_delay" -> Some("1 Minute"))))), + ( + "convert to auto refresh with allowed options", + Seq( + ( + Map.empty[String, String], + Map("auto_refresh" -> "true", "refresh_interval" -> "1 Minute"), + Map( + "auto_refresh" -> true, + "incremental_refresh" -> false, + "refresh_interval" -> Some("1 Minute"), + "checkpoint_location" -> None, + "watermark_delay" -> None)), + ( + Map.empty[String, String], + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map( + "auto_refresh" -> true, + "incremental_refresh" -> false, + "refresh_interval" -> None, + "checkpoint_location" -> Some("s3a://test/"), + "watermark_delay" -> None)), + ( + Map.empty[String, String], + Map("auto_refresh" -> "true", "watermark_delay" -> "1 Minute"), + Map( + "auto_refresh" -> true, + "incremental_refresh" -> false, + "refresh_interval" -> None, + "checkpoint_location" -> None, + "watermark_delay" -> Some("1 Minute")))))).foreach { case (testName, testCases) => + test(s"should succeed if $testName") { + testCases.foreach { case (initialOptionsMap, updateOptionsMap, expectedOptionsMap) => + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions(initialOptionsMap + .get("checkpoint_location") + .map(_ => + initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(initialOptionsMap))) + .create() + flint.refreshIndex(testIndex) + + val indexInitial = flint.describeIndex(testIndex).get + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + updateOptionsMap + .get("checkpoint_location") + .map(_ => + updateOptionsMap + .updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(updateOptionsMap))) + flint.updateIndex(updatedIndex) + + val optionsFinal = flint.describeIndex(testIndex).get.options + optionsFinal.autoRefresh() shouldBe expectedOptionsMap.get("auto_refresh").get + optionsFinal + .incrementalRefresh() shouldBe expectedOptionsMap.get("incremental_refresh").get + optionsFinal.refreshInterval() shouldBe expectedOptionsMap.get("refresh_interval").get + optionsFinal.checkpointLocation() shouldBe (expectedOptionsMap + .get("checkpoint_location") + .get match { + case Some(_) => Some(checkpointDir.getAbsolutePath) + case None => None + }) + optionsFinal.watermarkDelay() shouldBe expectedOptionsMap.get("watermark_delay").get + + deleteTestIndex(testIndex) + } + } + } + } + + test("update index should fail if index is updated by others before transaction starts") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + // This update will be delayed + val indexInitial = flint.describeIndex(testIndex).get + val updatedIndexObsolete = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + + // This other update finishes first, converting to auto refresh + flint.updateIndex( + flint + .skippingIndex() + .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))) + // Adding another update to convert to full refresh, so the obsolete update doesn't fail for option validation or state validation + val indexUpdated = flint.describeIndex(testIndex).get + flint.updateIndex( + flint + .skippingIndex() + .copyWithUpdate(indexUpdated, FlintSparkIndexOptions(Map("auto_refresh" -> "false")))) + + // This update trying to update an obsolete index should fail + the[IllegalStateException] thrownBy + flint.updateIndex(updatedIndexObsolete) + + // Verify index after update + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.autoRefresh() shouldBe false + indexFinal.options.checkpointLocation() shouldBe empty + } + } + + test("update full refresh index to auto refresh should start job") { + // Create full refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + spark.streams.active.find(_.name == testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 0 + + // Update Flint index to auto refresh and wait for complete + val indexInitial = flint.describeIndex(testIndex).get + val updatedIndex = + flint + .skippingIndex() + .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + flint.queryIndex(testIndex).collect().toSet should have size 2 + } + + test("update incremental refresh index to auto refresh should start job") { + withTempDir { checkpointDir => + // Create incremental refresh Flint index and wait for complete + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "incremental_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + + flint.refreshIndex(testIndex) shouldBe empty + spark.streams.active.find(_.name == testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Delete all index data intentionally and generate a new source file + openSearchClient.deleteByQuery( + new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()), + RequestOptions.DEFAULT) + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + // Update Flint index to auto refresh and wait for complete + val indexInitial = flint.describeIndex(testIndex).get + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "incremental_refresh" -> "false"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + // Expect to only refresh the new file + flint.queryIndex(testIndex).collect().toSet should have size 1 + } + } + + test("update auto refresh index to full refresh should stop job") { + // Create auto refresh Flint index and wait for complete + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + val jobId = flint.refreshIndex(testIndex) + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Update Flint index to full refresh + val indexInitial = flint.describeIndex(testIndex).get + val updatedIndex = + flint + .skippingIndex() + .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))) + flint.updateIndex(updatedIndex) shouldBe empty + + // Expect refresh job to be stopped + spark.streams.active.find(_.name == testIndex) shouldBe empty + + // Generate a new source file + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + // Index shouldn't be refreshed + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Full refresh after update + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 3 + } + + test("update auto refresh index to incremental refresh should stop job") { + withTempDir { checkpointDir => + // Create auto refresh Flint index and wait for complete + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions( + Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + + val jobId = flint.refreshIndex(testIndex) + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Update Flint index to incremental refresh + val indexInitial = flint.describeIndex(testIndex).get + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions(Map("auto_refresh" -> "false", "incremental_refresh" -> "true"))) + flint.updateIndex(updatedIndex) shouldBe empty + + // Expect refresh job to be stopped + spark.streams.active.find(_.name == testIndex) shouldBe empty + + // Generate a new source file + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + // Index shouldn't be refreshed + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Delete all index data intentionally + openSearchClient.deleteByQuery( + new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()), + RequestOptions.DEFAULT) + + // Expect to only refresh the new file + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 1 + } + } +}