diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index f2167f061..19b0102a6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -13,10 +13,11 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh -import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy @@ -216,6 +217,13 @@ class FlintSpark(val spark: SparkSession) extends Logging { def updateIndex(index: FlintSparkIndex): Option[String] = { logInfo(s"Updating Flint index $index") val indexName = index.name + + validateUpdateAllowed( + describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + .options, + index.options) + if (flintClient.exists(indexName)) { try { // Relies on validation to forbid auto-to-auto and manual-to-manual updates @@ -385,6 +393,52 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } + /** + * Validate the index update options are allowed. + * @param originalOptions + * original options + * @param updatedOptions + * the updated options + */ + private def validateUpdateAllowed( + originalOptions: FlintSparkIndexOptions, + updatedOptions: FlintSparkIndexOptions): Unit = { + // auto_refresh must change + if (updatedOptions.autoRefresh() == originalOptions.autoRefresh()) { + throw new IllegalArgumentException("auto_refresh option must be updated") + } + + val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match { + case (true, false) => AUTO + case (false, false) => FULL + case (false, true) => INCREMENTAL + case (true, true) => + throw new IllegalArgumentException( + "auto_refresh and incremental_refresh options cannot both be true") + } + + // validate allowed options depending on refresh mode + val allowedOptionNames = refreshMode match { + case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH) + case AUTO | INCREMENTAL => + Set( + AUTO_REFRESH, + INCREMENTAL_REFRESH, + REFRESH_INTERVAL, + CHECKPOINT_LOCATION, + WATERMARK_DELAY) + } + + // Get the changed option names + val updateOptionNames = updatedOptions.options.filterNot { + case (k, v) => originalOptions.options.get(k).contains(v) + }.keys + if (!updateOptionNames.forall(allowedOptionNames.map(_.toString).contains)) { + throw new IllegalArgumentException( + s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptionNames}") + } + } + private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = { val indexName = index.name flintClient diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index e817465a4..9ed759804 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -61,8 +61,19 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { def create(ignoreIfExists: Boolean = false): Unit = flint.createIndex(buildIndex(), ignoreIfExists) - def copyWithUpdate(index: FlintSparkIndex, options: FlintSparkIndexOptions): FlintSparkIndex = { - val updatedOptions = index.options.update(options) + /** + * Copy Flint index with updated options. + * + * @param index + * Flint index to copy + * @param updateOptions + * options to update + * @return + * updated Flint index + */ + def copyWithUpdate(index: FlintSparkIndex, updateOptions: FlintSparkIndexOptions): FlintSparkIndex = { + val originalOptions = index.options + val updatedOptions = originalOptions.copy(options = originalOptions.options ++ updateOptions.options) val updatedMetadata = index .metadata() .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 0ebb34e22..9107a8a66 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -10,7 +10,6 @@ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames -import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ /** * Flint Spark index configurable options. @@ -119,20 +118,6 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { map.result() } - /** - * Merge two FlintSparkIndexOptions. If an option exists in both instances, the value from the - * other instance overwrites the value from this instance. - * @param other - * options to update - * @return - * updated Flint Spark index options - */ - def update(other: FlintSparkIndexOptions): FlintSparkIndexOptions = { - val updatedOptions = FlintSparkIndexOptions(options ++ other.options) - validateUpdateAllowed(other, updatedOptions) - updatedOptions - } - private def getOptionValue(name: OptionName): Option[String] = { options.get(name.toString) } @@ -142,47 +127,6 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { .map(opt => (parse(opt) \ key).extract[Map[String, String]]) .getOrElse(Map.empty) } - - /** - * Validate the index update options are allowed. - * @param other - * options to update - * @param updatedOptions - * the updated options - */ - private def validateUpdateAllowed( - other: FlintSparkIndexOptions, - updatedOptions: FlintSparkIndexOptions): Unit = { - // auto_refresh must change - if (updatedOptions.autoRefresh() == autoRefresh()) { - throw new IllegalArgumentException("auto_refresh option must be updated") - } - - val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match { - case (true, false) => AUTO - case (false, false) => FULL - case (false, true) => INCREMENTAL - case (true, true) => - throw new IllegalArgumentException( - "auto_refresh and incremental_refresh options cannot both be true") - } - - // validate allowed options depending on refresh mode - val allowedOptions = refreshMode match { - case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH) - case AUTO | INCREMENTAL => - Set( - AUTO_REFRESH, - INCREMENTAL_REFRESH, - REFRESH_INTERVAL, - CHECKPOINT_LOCATION, - WATERMARK_DELAY) - } - if (!other.options.keys.forall(allowedOptions.map(_.toString).contains)) { - throw new IllegalArgumentException( - s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptions}") - } - } } object FlintSparkIndexOptions { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index 2c6908242..ebc8b47c4 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -48,12 +48,16 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .create() val index = flint.describeIndex(testIndex).get + // Update index options val updateOptions = FlintSparkIndexOptions(Map("auto_refresh" -> "true", "incremental_refresh" -> "false")) val updatedIndex = flint.skippingIndex().copyWithUpdate(index, updateOptions) + flint.updateIndex(updatedIndex) + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get val optionJson = - compact(render(parse(updatedIndex.metadata().getContent) \ "_meta" \ "options")) + compact(render(parse(readNewIndex.metadata().getContent) \ "_meta" \ "options")) optionJson should matchJson(s""" | { | "auto_refresh": "true", @@ -65,11 +69,11 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { |""".stripMargin) // Load index options from index mapping (verify OS index setting in SQL IT) - updatedIndex.options.autoRefresh() shouldBe true - updatedIndex.options.incrementalRefresh() shouldBe false - updatedIndex.options.refreshInterval() shouldBe Some("1 Minute") - updatedIndex.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) - updatedIndex.options.indexSettings() shouldBe + readNewIndex.options.autoRefresh() shouldBe true + readNewIndex.options.incrementalRefresh() shouldBe false + readNewIndex.options.refreshInterval() shouldBe Some("1 Minute") + readNewIndex.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + readNewIndex.options.indexSettings() shouldBe Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}") } } @@ -82,32 +86,41 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() + flint.refreshIndex(testIndex) var index = flint.describeIndex(testIndex).get // auto_refresh remains true - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) - the[IllegalArgumentException] thrownBy - flint + flint.updateIndex(updatedIndex) + } + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions( Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"))) + flint.updateIndex(updatedIndex) + } // auto_refresh not provided - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate(index, FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) - the[IllegalArgumentException] thrownBy - flint + flint.updateIndex(updatedIndex) + } + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions(Map("checkpoint_location" -> "s3a://test/"))) + flint.updateIndex(updatedIndex) + } deleteTestIndex(testIndex) @@ -120,29 +133,37 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { index = flint.describeIndex(testIndex).get // auto_refresh remains false - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))) - the[IllegalArgumentException] thrownBy - flint + flint.updateIndex(updatedIndex) + } + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions( Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/"))) + flint.updateIndex(updatedIndex) + } // auto_refresh not provided - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate(index, FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) - the[IllegalArgumentException] thrownBy - flint + flint.updateIndex(updatedIndex) + } + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions(Map("checkpoint_location" -> "s3a://test/"))) + flint.updateIndex(updatedIndex) + } } test("should succeed if convert to full refresh with allowed options") { @@ -152,47 +173,65 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() - val index = flint.describeIndex(testIndex).get + flint.refreshIndex(testIndex) + // Update index options + val index = flint.describeIndex(testIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions(Map("auto_refresh" -> "false", "incremental_refresh" -> "false"))) + flint.updateIndex(updatedIndex) - updatedIndex.options.autoRefresh() shouldBe false - updatedIndex.options.incrementalRefresh() shouldBe false + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get + readNewIndex.options.autoRefresh() shouldBe false + readNewIndex.options.incrementalRefresh() shouldBe false } test("should fail if convert to full refresh with disallowed options") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) - .create() - val index = flint.describeIndex(testIndex).get - - the[IllegalArgumentException] thrownBy - flint - .skippingIndex() - .copyWithUpdate( - index, - FlintSparkIndexOptions( - Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/"))) - the[IllegalArgumentException] thrownBy - flint - .skippingIndex() - .copyWithUpdate( - index, - FlintSparkIndexOptions( - Map("auto_refresh" -> "false", "refresh_interval" -> "5 Minute"))) - the[IllegalArgumentException] thrownBy + withTempDir { checkpointDir => flint .skippingIndex() - .copyWithUpdate( - index, - FlintSparkIndexOptions(Map("auto_refresh" -> "false", "watermark_delay" -> "1 Minute"))) + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + flint.refreshIndex(testIndex) + val index = flint.describeIndex(testIndex).get + + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + index, + FlintSparkIndexOptions( + Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/"))) + flint.updateIndex(updatedIndex) + } + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + index, + FlintSparkIndexOptions( + Map("auto_refresh" -> "false", "refresh_interval" -> "5 Minute"))) + flint.updateIndex(updatedIndex) + } + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + index, + FlintSparkIndexOptions(Map("auto_refresh" -> "false", "watermark_delay" -> "1 Minute"))) + flint.updateIndex(updatedIndex) + } + } } test("should succeed if convert to incremental refresh with refresh_interval") { @@ -202,8 +241,10 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() - val index = flint.describeIndex(testIndex).get + flint.refreshIndex(testIndex) + // Update index options + val index = flint.describeIndex(testIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate( @@ -213,10 +254,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { "auto_refresh" -> "false", "incremental_refresh" -> "true", "refresh_interval" -> "1 Minute"))) + flint.updateIndex(updatedIndex) - updatedIndex.options.autoRefresh() shouldBe false - updatedIndex.options.incrementalRefresh() shouldBe true - updatedIndex.options.refreshInterval() shouldBe Some("1 Minute") + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get + readNewIndex.options.autoRefresh() shouldBe false + readNewIndex.options.incrementalRefresh() shouldBe true + readNewIndex.options.refreshInterval() shouldBe Some("1 Minute") } test("should succeed if convert to incremental refresh with checkpoint_location") { @@ -226,8 +270,10 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() - val index = flint.describeIndex(testIndex).get + flint.refreshIndex(testIndex) + // Update index options + val index = flint.describeIndex(testIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate( @@ -237,10 +283,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { "auto_refresh" -> "false", "incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/"))) + flint.updateIndex(updatedIndex) - updatedIndex.options.autoRefresh() shouldBe false - updatedIndex.options.incrementalRefresh() shouldBe true - updatedIndex.options.checkpointLocation() shouldBe Some("s3a://test/") + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get + readNewIndex.options.autoRefresh() shouldBe false + readNewIndex.options.incrementalRefresh() shouldBe true + readNewIndex.options.checkpointLocation() shouldBe Some("s3a://test/") } test("should succeed if convert to incremental refresh with watermark_delay") { @@ -250,8 +299,10 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() - val index = flint.describeIndex(testIndex).get + flint.refreshIndex(testIndex) + // Update index options + val index = flint.describeIndex(testIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate( @@ -261,10 +312,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { "auto_refresh" -> "false", "incremental_refresh" -> "true", "watermark_delay" -> "1 Minute"))) + flint.updateIndex(updatedIndex) - updatedIndex.options.autoRefresh() shouldBe false - updatedIndex.options.incrementalRefresh() shouldBe true - updatedIndex.options.watermarkDelay() shouldBe Some("1 Minute") + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get + readNewIndex.options.autoRefresh() shouldBe false + readNewIndex.options.incrementalRefresh() shouldBe true + readNewIndex.options.watermarkDelay() shouldBe Some("1 Minute") } test("should fail if convert to incremental refresh with disallowed options") { @@ -274,10 +328,11 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() + flint.refreshIndex(testIndex) val index = flint.describeIndex(testIndex).get - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, @@ -286,6 +341,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { "auto_refresh" -> "false", "incremental_refresh" -> "true", "output_mode" -> "complete"))) + flint.updateIndex(updatedIndex) + } } test("should succeed if convert to auto refresh with refresh_interval") { @@ -294,17 +351,21 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addPartitions("year", "month") .create() - val index = flint.describeIndex(testIndex).get + // Update index options + val index = flint.describeIndex(testIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "refresh_interval" -> "5 Minute"))) + flint.updateIndex(updatedIndex) - updatedIndex.options.autoRefresh() shouldBe true - updatedIndex.options.incrementalRefresh() shouldBe false - updatedIndex.options.refreshInterval() shouldBe Some("5 Minute") + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get + readNewIndex.options.autoRefresh() shouldBe true + readNewIndex.options.incrementalRefresh() shouldBe false + readNewIndex.options.refreshInterval() shouldBe Some("5 Minute") } test("should succeed if convert to auto refresh with checkpoint_location") { @@ -314,8 +375,9 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addPartitions("year", "month") .create() - val index = flint.describeIndex(testIndex).get + // Update index options + val index = flint.describeIndex(testIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate( @@ -324,10 +386,13 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))) + flint.updateIndex(updatedIndex) - updatedIndex.options.autoRefresh() shouldBe true - updatedIndex.options.incrementalRefresh() shouldBe false - updatedIndex.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get + readNewIndex.options.autoRefresh() shouldBe true + readNewIndex.options.incrementalRefresh() shouldBe false + readNewIndex.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) } } @@ -337,17 +402,21 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addPartitions("year", "month") .create() - val index = flint.describeIndex(testIndex).get + // Update index options + val index = flint.describeIndex(testIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "watermark_delay" -> "5 Minute"))) + flint.updateIndex(updatedIndex) - updatedIndex.options.autoRefresh() shouldBe true - updatedIndex.options.incrementalRefresh() shouldBe false - updatedIndex.options.watermarkDelay() shouldBe Some("5 Minute") + // Verify index after update + val readNewIndex = flint.describeIndex(testIndex).get + readNewIndex.options.autoRefresh() shouldBe true + readNewIndex.options.incrementalRefresh() shouldBe false + readNewIndex.options.watermarkDelay() shouldBe Some("5 Minute") } test("should fail if convert to auto refresh with disallowed options") { @@ -358,12 +427,14 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .create() val index = flint.describeIndex(testIndex).get - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "output_mode" -> "complete"))) + flint.updateIndex(updatedIndex) + } } test("should fail if convert to invalid refresh mode") { @@ -374,12 +445,14 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .create() var index = flint.describeIndex(testIndex).get - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate( index, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "incremental_refresh" -> "true"))) + flint.updateIndex(updatedIndex) + } deleteTestIndex(testIndex) @@ -389,12 +462,15 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() + flint.refreshIndex(testIndex) index = flint.describeIndex(testIndex).get - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate(index, FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + flint.updateIndex(updatedIndex) + } deleteTestIndex(testIndex) @@ -404,11 +480,14 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) .create() + index = flint.describeIndex(testIndex).get - the[IllegalArgumentException] thrownBy - flint + the[IllegalArgumentException] thrownBy { + val updatedIndex = flint .skippingIndex() .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + flint.updateIndex(updatedIndex) + } } test("update full refresh index to auto refresh should start job") { @@ -461,10 +540,10 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()), RequestOptions.DEFAULT) sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=4) - | VALUES ('Hello', 35, 'Vancouver') - | """.stripMargin) + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) // Update Flint index to auto refresh and wait for complete val index = flint.describeIndex(testIndex).get @@ -516,10 +595,10 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { // Generate a new source file sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=4) - | VALUES ('Hello', 35, 'Vancouver') - | """.stripMargin) + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) // Index shouldn't be refreshed flint.queryIndex(testIndex).collect().toSet should have size 2 @@ -562,10 +641,10 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { // Generate a new source file sql(s""" - | INSERT INTO $testTable - | PARTITION (year=2023, month=4) - | VALUES ('Hello', 35, 'Vancouver') - | """.stripMargin) + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) // Index shouldn't be refreshed flint.queryIndex(testIndex).collect().toSet should have size 2