diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 68d2409ee..fbc24e93a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -510,6 +510,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w private def isSchedulerModeChanged( originalOptions: FlintSparkIndexOptions, updatedOptions: FlintSparkIndexOptions): Boolean = { + // Altering from manual to auto should not be interpreted as a scheduling mode change. + if (!originalOptions.options.contains(SCHEDULER_MODE.toString)) { + return false + } updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled() } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index c9f6c47f7..f27c0dae9 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -618,6 +618,44 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { flint.queryIndex(testIndex).collect().toSet should have size 2 } + test("update full refresh index to auto refresh should start job with external scheduler") { + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true") + + withTempDir { checkpointDir => + // Create full refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "false")), testIndex) + .create() + + spark.streams.active.find(_.name == testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 0 + val indexInitial = flint.describeIndex(testIndex).get + indexInitial.options.isExternalSchedulerEnabled() shouldBe false + + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe empty + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.isExternalSchedulerEnabled() shouldBe true + indexFinal.options.autoRefresh() shouldBe true + indexFinal.options.refreshInterval() shouldBe Some( + FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL) + + verifySchedulerIndex(testIndex, 5, "MINUTES") + } + } + test("update incremental refresh index to auto refresh should start job") { withTempDir { checkpointDir => // Create incremental refresh Flint index and wait for complete @@ -667,6 +705,51 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { } } + test( + "update incremental refresh index to auto refresh should start job with external scheduler") { + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true") + + withTempDir { checkpointDir => + // Create incremental refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "incremental_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + + spark.streams.active.find(_.name == testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 0 + val indexInitial = flint.describeIndex(testIndex).get + indexInitial.options.isExternalSchedulerEnabled() shouldBe false + + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "incremental_refresh" -> "false", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe empty + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.isExternalSchedulerEnabled() shouldBe true + indexFinal.options.autoRefresh() shouldBe true + indexFinal.options.refreshInterval() shouldBe Some( + FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL) + + verifySchedulerIndex(testIndex, 5, "MINUTES") + } + } + test("update auto refresh index to full refresh should stop job") { // Create auto refresh Flint index and wait for complete flint