From 143c847b1debde8db891db145a95024bb2aa11e3 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Fri, 11 Oct 2024 16:42:36 -0700 Subject: [PATCH] Add more IT Signed-off-by: Louis Chu --- .../opensearch/flint/spark/FlintSpark.scala | 26 ++-- .../spark/FlintSparkUpdateIndexITSuite.scala | 118 ++++++++++++++++++ 2 files changed, 130 insertions(+), 14 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index f9cc0175a..779b7e013 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -446,27 +446,24 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w originalOptions: FlintSparkIndexOptions, updatedOptions: FlintSparkIndexOptions): Unit = { val isAutoRefreshChanged = updatedOptions.autoRefresh() != originalOptions.autoRefresh() - val isSchedulerModeChanged = - updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled() - - // Prevent changing both auto_refresh and scheduler_mode simultaneously - if (isAutoRefreshChanged && isSchedulerModeChanged) { - throw new IllegalArgumentException( - "Cannot change both auto_refresh and scheduler_mode simultaneously") - } val changedOptions = updatedOptions.options.filterNot { case (k, v) => originalOptions.options.get(k).contains(v) }.keySet if (changedOptions.isEmpty) { - throw new IllegalArgumentException("No index options updated") + throw new IllegalArgumentException("No index option updated") } // Validate based on auto_refresh state and changes (isAutoRefreshChanged, updatedOptions.autoRefresh()) match { case (true, true) => // Changing from manual to auto refresh + if (updatedOptions.incrementalRefresh()) { + throw new IllegalArgumentException( + "Altering index to auto refresh while incremental refresh remains true") + } + val allowedOptions = Set( AUTO_REFRESH, INCREMENTAL_REFRESH, @@ -474,15 +471,13 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w REFRESH_INTERVAL, CHECKPOINT_LOCATION, WATERMARK_DELAY) - validateChangedOptions(changedOptions, allowedOptions, "Changing to auto refresh") - + validateChangedOptions(changedOptions, allowedOptions, s"Altering index to auto refresh") case (true, false) => val allowedOptions = if (updatedOptions.incrementalRefresh()) { // Changing from auto refresh to incremental refresh Set( AUTO_REFRESH, INCREMENTAL_REFRESH, - SCHEDULER_MODE, REFRESH_INTERVAL, CHECKPOINT_LOCATION, WATERMARK_DELAY) @@ -493,11 +488,14 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w validateChangedOptions( changedOptions, allowedOptions, - "Changing to full/incremental refresh") + "Altering index to full/incremental refresh") case (false, true) => // original refresh_mode is auto, only allow changing scheduler_mode - validateChangedOptions(changedOptions, Set(SCHEDULER_MODE), "Auto refresh remains true") + validateChangedOptions( + changedOptions, + Set(SCHEDULER_MODE), + "Altering index when auto_refresh remains true") case (false, false) => // original refresh_mode is full/incremental, not allowed to change any options diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index 7bbf24567..53889045f 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -15,6 +15,8 @@ import org.opensearch.index.reindex.DeleteByQueryRequest import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.apache.spark.sql.flint.config.FlintSparkConf + class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { /** Test table and index name */ @@ -32,6 +34,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { // Delete all test indices deleteTestIndex(testIndex) sql(s"DROP TABLE $testTable") + conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) } test("update index with index options successfully") { @@ -177,6 +180,121 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { } } + // Test update options validation failure with external scheduler + Seq( + ( + "update index without changing index option", + Seq( + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("auto_refresh" -> "true")), + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("checkpoint_location" -> "s3a://test/")), + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"))), + "No index option updated"), + ( + "update index option when auto_refresh is false", + Seq( + ( + Map.empty[String, String], + Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")), + ( + Map.empty[String, String], + Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/")), + (Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/"))), + "No options can be updated when auto_refresh remains false"), + ( + "update other index option besides scheduler_mode when auto_refresh is true", + Seq( + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("watermark_delay" -> "1 Minute"))), + "Altering index when auto_refresh remains true only allows changing: Set(scheduler_mode). Invalid options"), + ( + "convert to full refresh with disallowed options", + Seq( + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("auto_refresh" -> "false", "scheduler_mode" -> "internal")), + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("auto_refresh" -> "false", "refresh_interval" -> "5 Minute")), + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("auto_refresh" -> "false", "watermark_delay" -> "1 Minute"))), + "Altering index to full/incremental refresh only allows changing"), + ( + "convert to auto refresh with disallowed options", + Seq( + ( + Map.empty[String, String], + Map( + "auto_refresh" -> "true", + "output_mode" -> "complete", + "checkpoint_location" -> "s3a://test/"))), + "Altering index to auto refresh only allows changing: Set(auto_refresh, watermark_delay, scheduler_mode, " + + "refresh_interval, incremental_refresh, checkpoint_location). Invalid options: Set(output_mode)"), + ( + "convert to invalid refresh mode", + Seq( + ( + Map.empty[String, String], + Map( + "auto_refresh" -> "true", + "incremental_refresh" -> "true", + "checkpoint_location" -> "s3a://test/"))), + "Altering index to auto refresh while incremental refresh remains true")) + .foreach { case (testName, testCases, expectedErrorMessage) => + test(s"should fail if $testName and external scheduler enabled") { + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true") + testCases.foreach { case (initialOptionsMap, updateOptionsMap) => + logInfo(s"initialOptionsMap: ${initialOptionsMap}") + logInfo(s"updateOptionsMap: ${updateOptionsMap}") + + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + initialOptionsMap + .get("checkpoint_location") + .map(_ => + initialOptionsMap + .updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(initialOptionsMap)), + testIndex) + .create() + flint.refreshIndex(testIndex) + + val index = flint.describeIndex(testIndex).get + val exception = the[IllegalArgumentException] thrownBy { + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + index, + FlintSparkIndexOptions( + updateOptionsMap + .get("checkpoint_location") + .map(_ => + updateOptionsMap + .updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(updateOptionsMap))) + flint.updateIndex(updatedIndex) + } + + exception.getMessage should include(expectedErrorMessage) + + deleteTestIndex(testIndex) + } + } + } + } + // Test update options validation success Seq( (