From 9399e6912a4db76b24e8f03d77009d8e2750a15d Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 19 Mar 2024 16:36:41 -0700 Subject: [PATCH] test case for update options validation Signed-off-by: Sean Kao --- .../spark/FlintSparkUpdateIndexITSuite.scala | 209 ++++++++++++++++++ 1 file changed, 209 insertions(+) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index d0977e220..552bdcecd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -88,6 +88,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() + flint.refreshIndex(testIndex) // auto_refresh remains true the[IllegalArgumentException] thrownBy @@ -133,6 +134,214 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { updateIndex(flint, testIndex, Map("checkpoint_location" -> "s3a://test/")) } + test("should succeed if convert to full refresh with allowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "false")) + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe false + readNewIndex.get.options.incrementalRefresh() shouldBe false + } + + test("should fail if convert to full refresh with unallowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "checkpoint_location" -> "s3a://test/")) + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "refresh_interval" -> "5 Minute")) + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "watermark_delay" -> "1 Minute")) + } + + test("should succeed if convert to incremental refresh with checkpoint_location option") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "checkpoint_location" -> "s3a://test/")) + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe false + readNewIndex.get.options.incrementalRefresh() shouldBe true + readNewIndex.get.options.checkpointLocation() shouldBe Some("s3a://test/") + } + + test("should succeed if convert to incremental refresh with watermark_delay option") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + flint.refreshIndex(testIndex) + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "watermark_delay" -> "1 Minute")) + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe false + readNewIndex.get.options.incrementalRefresh() shouldBe true + readNewIndex.get.options.watermarkDelay() shouldBe Some("1 Minute") + } + + test("should fail if convert to incremental refresh with unallowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "refresh_interval" -> "5 Minute")) + } + + test("should succeed if convert to auto refresh with checkpoint_location") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)) + + val readNewIndex = flint.describeIndex(testIndex) + readNewIndex shouldBe defined + + readNewIndex.get.options.autoRefresh() shouldBe true + readNewIndex.get.options.incrementalRefresh() shouldBe false + readNewIndex.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + } + } + + test("should fail if convert to auto refresh with unallowed options") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "refresh_interval" -> "5 Minute")) + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "watermark_delay" -> "1 Minute")) + } + + test("should fail if convert to invalid refresh mode") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + the[IllegalArgumentException] thrownBy + updateIndex( + flint, + testIndex, + Map( + "auto_refresh" -> "true", + "incremental_refresh" -> "true")) + + deleteTestIndex(testIndex) + + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + the[IllegalArgumentException] thrownBy + updateIndex(flint, testIndex, Map("incremental_refresh" -> "true")) + + deleteTestIndex(testIndex) + + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + .create() + + the[IllegalArgumentException] thrownBy + updateIndex(flint, testIndex, Map("auto_refresh" -> "true")) + } + test("update full refresh index to auto refresh should start job") { // Create full refresh Flint index flint