diff --git a/docs/index.md b/docs/index.md index 30011e10f..713186512 100644 --- a/docs/index.md +++ b/docs/index.md @@ -373,7 +373,7 @@ User can provide the following options in `WITH` clause of create statement: + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. + `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled. + `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. -+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. ++ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. + `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 31d1d91f3..74626d25d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -96,7 +96,7 @@ case class FlintSparkMaterializedView( private def watermark(timeCol: Attribute, child: LogicalPlan) = { require( options.watermarkDelay().isDefined, - "watermark delay is required for incremental refresh with aggregation") + "watermark delay is required for auto refresh and incremental refresh with aggregation") val delay = options.watermarkDelay().get EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 38ecb8515..b89c97191 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -12,6 +12,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexFactory} import org.opensearch.flint.spark.FlintSpark.UpdateMode._ import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder @@ -92,7 +93,7 @@ object FlintSparkSqlAstBuilder { .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) val oldOptions = oldIndex.options.options - validateOptions(oldOptions, updateOptions) + validateUpdateOptions(oldOptions, updateOptions, oldIndex.kind) val mergedOptions = oldOptions ++ updateOptions val newMetadata = oldIndex.metadata().copy(options = mergedOptions.mapValues(_.asInstanceOf[AnyRef]).asJava) @@ -108,13 +109,17 @@ object FlintSparkSqlAstBuilder { /** * Validate update options. + * These are rules specific for updating index, validating the update is allowed. + * It doesn't check whether the resulting index options will be valid. * * @param oldOptions * existing options * @param updateOptions * options to update + * @param indexKind + * index kind */ - private def validateOptions(oldOptions: Map[String, String], updateOptions: Map[String, String]): Unit = { + private def validateUpdateOptions(oldOptions: Map[String, String], updateOptions: Map[String, String], indexKind: String): Unit = { val mergedOptions = oldOptions ++ updateOptions val newAutoRefresh = mergedOptions.getOrElse(AUTO_REFRESH.toString, "false") val oldAutoRefresh = oldOptions.getOrElse(AUTO_REFRESH.toString, "false") @@ -124,20 +129,23 @@ object FlintSparkSqlAstBuilder { throw new IllegalArgumentException("auto_refresh option must be updated") } - // validate allowed options depending on refresh mode val newIncrementalRefresh = mergedOptions.getOrElse(INCREMENTAL_REFRESH.toString, "false") - val (refreshMode, allowedOptions) = (newAutoRefresh, newIncrementalRefresh) match { - case ("true", "false") => - (AUTO, Set(AUTO_REFRESH, INCREMENTAL_REFRESH, CHECKPOINT_LOCATION)) - case ("false", "false") => - (FULL, Set(AUTO_REFRESH, INCREMENTAL_REFRESH)) - case ("false", "true") => - (INCREMENTAL, Set(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION)) + val refreshMode = (newAutoRefresh, newIncrementalRefresh) match { + case ("true", "false") => AUTO + case ("false", "false") => FULL + case ("false", "true") => INCREMENTAL case ("true", "true") => throw new IllegalArgumentException("auto_refresh and incremental_refresh options cannot both be true") } + + // validate allowed options depending on refresh mode + val allowedOptions = refreshMode match { + case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH) + case AUTO | INCREMENTAL => + Set(AUTO_REFRESH, INCREMENTAL_REFRESH, REFRESH_INTERVAL, CHECKPOINT_LOCATION, WATERMARK_DELAY) + } if (!updateOptions.keys.forall(allowedOptions.map(_.toString).contains)) { - throw new IllegalArgumentException(s"Altering to ${refreshMode} refresh index only allows options: ${allowedOptions}") + throw new IllegalArgumentException(s"Altering ${indexKind} index to ${refreshMode} refresh only allows options: ${allowedOptions}") } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index d0977e220..b1a0278ae 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -88,6 +88,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() + flint.refreshIndex(testIndex) // auto_refresh remains true the[IllegalArgumentException] thrownBy @@ -133,6 +134,274 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex(flint, testIndex, Map("checkpoint_location" -> "s3a://test/")) } + test("should succeed if convert to full refresh with allowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "false")) + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe false + readNewIndex.get.options.incrementalRefresh() shouldBe false + } + + test("should fail if convert to full refresh with disallowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "checkpoint_location" -> "s3a://test/")) + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "refresh_interval" -> "5 Minute")) + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "watermark_delay" -> "1 Minute")) + } + + test("should succeed if convert to incremental refresh with refresh_interval") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "refresh_interval" -> "1 Minute")) + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe false + readNewIndex.get.options.incrementalRefresh() shouldBe true + readNewIndex.get.options.refreshInterval() shouldBe Some("1 Minute") + } + + test("should succeed if convert to incremental refresh with checkpoint_location") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "checkpoint_location" -> "s3a://test/")) + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe false + readNewIndex.get.options.incrementalRefresh() shouldBe true + readNewIndex.get.options.checkpointLocation() shouldBe Some("s3a://test/") + } + + test("should succeed if convert to incremental refresh with watermark_delay") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "watermark_delay" -> "1 Minute")) + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe false + readNewIndex.get.options.incrementalRefresh() shouldBe true + readNewIndex.get.options.watermarkDelay() shouldBe Some("1 Minute") + } + + test("should fail if convert to incremental refresh with disallowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "output_mode" -> "complete")) + } + + test("should succeed if convert to auto refresh with refresh_interval") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "refresh_interval" -> "5 Minute")) + + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe true + readNewIndex.get.options.incrementalRefresh() shouldBe false + readNewIndex.get.options.refreshInterval() shouldBe Some("5 Minute") + } + + test("should succeed if convert to auto refresh with checkpoint_location") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)) + + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe true + readNewIndex.get.options.incrementalRefresh() shouldBe false + readNewIndex.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + } + } + + test("should succeed if convert to auto refresh with watermark_delay") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "watermark_delay" -> "5 Minute")) + + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe true + readNewIndex.get.options.incrementalRefresh() shouldBe false + readNewIndex.get.options.watermarkDelay() shouldBe Some("5 Minute") + } + + test("should fail if convert to auto refresh with disallowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "output_mode" -> "complete")) + } + + test("should fail if convert to invalid refresh mode") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "incremental_refresh" -> "true")) + + deleteTestIndex(testIndex) + + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + the[IllegalArgumentException] thrownBy + updateIndex(flint, testIndex, Map("incremental_refresh" -> "true")) + + deleteTestIndex(testIndex) + + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + .create() + + the[IllegalArgumentException] thrownBy + updateIndex(flint, testIndex, Map("auto_refresh" -> "true")) + } + test("update full refresh index to auto refresh should start job") { // Create full refresh Flint index flint