diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index c22ee85af..1525b55b0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -23,6 +23,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.scheduler.{AsyncQuerySchedulerBuilder, FlintSparkJobExternalSchedulingService, FlintSparkJobInternalSchedulingService, FlintSparkJobSchedulingService} import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex @@ -228,16 +229,16 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w val originalOptions = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) .options - validateUpdateAllowed(originalOptions, index.options) - val isSchedulerModeChanged = - index.options.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled() + validateUpdateAllowed(originalOptions, index.options) withTransaction[Option[String]](indexName, "Update Flint index") { tx => // Relies on validation to prevent: // 1. auto-to-auto updates besides scheduler_mode // 2. any manual-to-manual updates // 3. both refresh_mode and scheduler_mode updated - (index.options.autoRefresh(), isSchedulerModeChanged) match { + ( + index.options.autoRefresh(), + isSchedulerModeChanged(originalOptions, index.options)) match { case (true, true) => updateSchedulerMode(index, tx) case (true, false) => updateIndexManualToAuto(index, tx) case (false, false) => updateIndexAutoToManual(index, tx) @@ -491,11 +492,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w "Altering index to full/incremental refresh") case (false, true) => - // original refresh_mode is auto, only allow changing scheduler_mode - validateChangedOptions( - changedOptions, - Set(SCHEDULER_MODE), - "Altering index when auto_refresh remains true") + // original refresh_mode is auto, only allow changing scheduler_mode and potentially refresh_interval + var allowedOptions = Set(SCHEDULER_MODE) + val schedulerMode = + if (updatedOptions.isExternalSchedulerEnabled()) SchedulerMode.EXTERNAL + else SchedulerMode.INTERNAL + val contextPrefix = + s"Altering index when auto_refresh remains true and scheduler_mode is $schedulerMode" + if (updatedOptions.isExternalSchedulerEnabled()) { + allowedOptions += REFRESH_INTERVAL + } + validateChangedOptions(changedOptions, allowedOptions, contextPrefix) case (false, false) => // original refresh_mode is full/incremental, not allowed to change any options @@ -520,6 +527,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w } } + private def isSchedulerModeChanged( + originalOptions: FlintSparkIndexOptions, + updatedOptions: FlintSparkIndexOptions): Boolean = { + updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled() + } + private def updateIndexAutoToManual( index: FlintSparkIndex, tx: OptimisticTransaction[Option[String]]): Option[String] = { @@ -600,7 +613,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) logInfo("Update index options complete") oldService.handleJob(index, AsyncQuerySchedulerAction.UNSCHEDULE) - logInfo(s"Unscheduled ${if (isExternal) "internal" else "external"} jobs") + logInfo( + s"Unscheduled refresh jobs from ${if (isExternal) "internal" else "external"} scheduler") newService.handleJob(index, AsyncQuerySchedulerAction.UPDATE) }) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index fc1a611ef..9b58a696c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -10,7 +10,6 @@ import java.util.{Collections, UUID} import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.core.logging.CustomLogging.logInfo import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala index 4ff5b5adb..80b788253 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala @@ -210,12 +210,12 @@ class FlintSparkIndexBuilderSuite Some( "spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")), ( - "set external mode when interval above threshold and no mode specified", + "set external mode when interval below threshold and no mode specified", true, "5 minutes", - Map("auto_refresh" -> "true", "refresh_interval" -> "10 minutes"), - Some(SchedulerMode.EXTERNAL.toString), - Some("10 minutes"), + Map("auto_refresh" -> "true", "refresh_interval" -> "1 minutes"), + Some(SchedulerMode.INTERNAL.toString), + Some("1 minutes"), None), ( "throw exception when interval below threshold but mode is external", diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index c2f0f9101..14d41c2bb 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -17,7 +17,6 @@ import org.opensearch.flint.common.FlintVersion.current import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler import org.scalatest.matchers.must.Matchers.defined diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index 53889045f..a6f7e0ed0 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -5,10 +5,16 @@ package org.opensearch.flint.spark +import scala.jdk.CollectionConverters.mapAsJavaMapConverter + import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ +import org.opensearch.OpenSearchException +import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions -import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} +import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.opensearch.index.query.QueryBuilders import org.opensearch.index.reindex.DeleteByQueryRequest @@ -180,6 +186,96 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { } } + test("update auto refresh index to switch scheduler mode") { + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true") + + withTempDir { checkpointDir => + // Create auto refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "refresh_interval" -> "4 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + flint.refreshIndex(testIndex) + + val indexInitial = flint.describeIndex(testIndex).get + indexInitial.options.refreshInterval() shouldBe Some("4 Minute") + the[OpenSearchException] thrownBy { + val client = + OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava)) + client.get( + new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, testIndex), + RequestOptions.DEFAULT) + } + + // Update Flint index to change refresh interval + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + Map("scheduler_mode" -> "external", "refresh_interval" -> "5 Minutes"))) + flint.updateIndex(updatedIndex) + + // Verify index after update + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.autoRefresh() shouldBe true + indexFinal.options.refreshInterval() shouldBe Some("5 Minutes") + indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + + // Verify scheduler index is updated + verifySchedulerIndex(testIndex, 5, "MINUTES") + } + } + + test("update auto refresh index to change refresh interval") { + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true") + + withTempDir { checkpointDir => + // Create auto refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "refresh_interval" -> "10 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + + val indexInitial = flint.describeIndex(testIndex).get + indexInitial.options.refreshInterval() shouldBe Some("10 Minute") + verifySchedulerIndex(testIndex, 10, "MINUTES") + + // Update Flint index to change refresh interval + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions(Map("refresh_interval" -> "5 Minutes"))) + flint.updateIndex(updatedIndex) + + // Verify index after update + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.autoRefresh() shouldBe true + indexFinal.options.refreshInterval() shouldBe Some("5 Minutes") + indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + + // Verify scheduler index is updated + verifySchedulerIndex(testIndex, 5, "MINUTES") + } + } + // Test update options validation failure with external scheduler Seq( ( @@ -207,12 +303,32 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { (Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/"))), "No options can be updated when auto_refresh remains false"), ( - "update other index option besides scheduler_mode when auto_refresh is true", + "update index option when refresh_interval value belows threshold", + Seq( + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("refresh_interval" -> "4 minutes"))), + "Input refresh_interval is 4 minutes, required above the interval threshold of external scheduler: 5 minutes"), + ( + "update index option when no change on auto_refresh", + Seq( + ( + Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + Map("scheduler_mode" -> "internal", "refresh_interval" -> "4 minutes")), + ( + Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "internal", + "checkpoint_location" -> "s3a://test/"), + Map("refresh_interval" -> "4 minutes"))), + "Altering index when auto_refresh remains true and scheduler_mode is internal only allows changing: Set(scheduler_mode). Invalid options"), + ( + "update other index option besides scheduler_mode and refresh_interval when auto_refresh is true", Seq( ( Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), Map("watermark_delay" -> "1 Minute"))), - "Altering index when auto_refresh remains true only allows changing: Set(scheduler_mode). Invalid options"), + "Altering index when auto_refresh remains true and scheduler_mode is external only allows changing: Set(scheduler_mode, refresh_interval). Invalid options"), ( "convert to full refresh with disallowed options", Seq( @@ -655,4 +771,28 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { flint.queryIndex(testIndex).collect().toSet should have size 1 } } + + private def verifySchedulerIndex( + indexName: String, + expectedPeriod: Int, + expectedUnit: String): Unit = { + val client = OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava)) + val response = client.get( + new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, indexName), + RequestOptions.DEFAULT) + + response.isExists shouldBe true + val sourceMap = response.getSourceAsMap + + sourceMap.get("jobId") shouldBe indexName + sourceMap.get( + "scheduledQuery") shouldBe s"REFRESH SKIPPING INDEX ON spark_catalog.default.`test`" + sourceMap.get("enabled") shouldBe true + sourceMap.get("queryLang") shouldBe "sql" + + val schedule = sourceMap.get("schedule").asInstanceOf[java.util.Map[String, Any]] + val interval = schedule.get("interval").asInstanceOf[java.util.Map[String, Any]] + interval.get("period") shouldBe expectedPeriod + interval.get("unit") shouldBe expectedUnit + } }