Skip to content

Commit

Permalink
Support alter refresh interval on external scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Oct 21, 2024
1 parent 8ab81ae commit 42a18dd
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.scheduler.{AsyncQuerySchedulerBuilder, FlintSparkJobExternalSchedulingService, FlintSparkJobInternalSchedulingService, FlintSparkJobSchedulingService}
import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand Down Expand Up @@ -229,16 +230,16 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
val originalOptions = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options
validateUpdateAllowed(originalOptions, index.options)

val isSchedulerModeChanged =
index.options.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()
validateUpdateAllowed(originalOptions, index.options)
withTransaction[Option[String]](indexName, "Update Flint index") { tx =>
// Relies on validation to prevent:
// 1. auto-to-auto updates besides scheduler_mode
// 2. any manual-to-manual updates
// 3. both refresh_mode and scheduler_mode updated
(index.options.autoRefresh(), isSchedulerModeChanged) match {
(
index.options.autoRefresh(),
isSchedulerModeChanged(originalOptions, index.options)) match {
case (true, true) => updateSchedulerMode(index, tx)
case (true, false) => updateIndexManualToAuto(index, tx)
case (false, false) => updateIndexAutoToManual(index, tx)
Expand Down Expand Up @@ -478,11 +479,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
"Altering index to full/incremental refresh")

case (false, true) =>
// original refresh_mode is auto, only allow changing scheduler_mode
validateChangedOptions(
changedOptions,
Set(SCHEDULER_MODE),
"Altering index when auto_refresh remains true")
// original refresh_mode is auto, only allow changing scheduler_mode and potentially refresh_interval
var allowedOptions = Set(SCHEDULER_MODE)
val schedulerMode =
if (updatedOptions.isExternalSchedulerEnabled()) SchedulerMode.EXTERNAL
else SchedulerMode.INTERNAL
val contextPrefix =
s"Altering index when auto_refresh remains true and scheduler_mode is $schedulerMode"
if (updatedOptions.isExternalSchedulerEnabled()) {
allowedOptions += REFRESH_INTERVAL
}
validateChangedOptions(changedOptions, allowedOptions, contextPrefix)

case (false, false) =>
// original refresh_mode is full/incremental, not allowed to change any options
Expand All @@ -507,6 +514,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}
}

private def isSchedulerModeChanged(
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Boolean = {
updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()
}

private def updateIndexAutoToManual(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
Expand Down Expand Up @@ -587,7 +600,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
logInfo("Update index options complete")
oldService.handleJob(index, AsyncQuerySchedulerAction.UNSCHEDULE)
logInfo(s"Unscheduled ${if (isExternal) "internal" else "external"} jobs")
logInfo(
s"Unscheduled refresh jobs from ${if (isExternal) "internal" else "external"} scheduler")
newService.handleJob(index, AsyncQuerySchedulerAction.UPDATE)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import java.util.{Collections, UUID}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.core.logging.CustomLogging.logInfo
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ class FlintSparkIndexBuilderSuite
Some(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")),
(
"set external mode when interval above threshold and no mode specified",
"set external mode when interval below threshold and no mode specified",
true,
"5 minutes",
Map("auto_refresh" -> "true", "refresh_interval" -> "10 minutes"),
Some(SchedulerMode.EXTERNAL.toString),
Some("10 minutes"),
Map("auto_refresh" -> "true", "refresh_interval" -> "1 minutes"),
Some(SchedulerMode.INTERNAL.toString),
Some("1 minutes"),
None),
(
"throw exception when interval below threshold but mode is external",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.opensearch.flint.common.FlintVersion.current
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.scalatest.matchers.must.Matchers.defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

package org.opensearch.flint.spark

import scala.jdk.CollectionConverters.mapAsJavaMapConverter

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.native.JsonMethods._
import org.opensearch.OpenSearchException
import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.DeleteByQueryRequest
Expand Down Expand Up @@ -180,6 +186,96 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
}
}

test("update auto refresh index to switch scheduler mode") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create auto refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "4 Minute",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.refreshInterval() shouldBe Some("4 Minute")
the[OpenSearchException] thrownBy {
val client =
OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava))
client.get(
new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, testIndex),
RequestOptions.DEFAULT)
}

// Update Flint index to change refresh interval
val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(
Map("scheduler_mode" -> "external", "refresh_interval" -> "5 Minutes")))
flint.updateIndex(updatedIndex)

// Verify index after update
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some("5 Minutes")
indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)

// Verify scheduler index is updated
verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

test("update auto refresh index to change refresh interval") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create auto refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "10 Minute",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()

val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.refreshInterval() shouldBe Some("10 Minute")
verifySchedulerIndex(testIndex, 10, "MINUTES")

// Update Flint index to change refresh interval
val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(Map("refresh_interval" -> "5 Minutes")))
flint.updateIndex(updatedIndex)

// Verify index after update
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some("5 Minutes")
indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)

// Verify scheduler index is updated
verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

// Test update options validation failure with external scheduler
Seq(
(
Expand Down Expand Up @@ -207,12 +303,12 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
(Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/"))),
"No options can be updated when auto_refresh remains false"),
(
"update other index option besides scheduler_mode when auto_refresh is true",
"update other index option besides scheduler_mode and refresh_interval when auto_refresh is true",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("watermark_delay" -> "1 Minute"))),
"Altering index when auto_refresh remains true only allows changing: Set(scheduler_mode). Invalid options"),
"Altering index when auto_refresh remains true and scheduler_mode is external only allows changing: Set(scheduler_mode, refresh_interval). Invalid options"),
(
"convert to full refresh with disallowed options",
Seq(
Expand Down Expand Up @@ -655,4 +751,28 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
flint.queryIndex(testIndex).collect().toSet should have size 1
}
}

private def verifySchedulerIndex(
indexName: String,
expectedPeriod: Int,
expectedUnit: String): Unit = {
val client = OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava))
val response = client.get(
new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, indexName),
RequestOptions.DEFAULT)

response.isExists shouldBe true
val sourceMap = response.getSourceAsMap

sourceMap.get("jobId") shouldBe indexName
sourceMap.get(
"scheduledQuery") shouldBe s"REFRESH SKIPPING INDEX ON spark_catalog.default.`test`"
sourceMap.get("enabled") shouldBe true
sourceMap.get("queryLang") shouldBe "sql"

val schedule = sourceMap.get("schedule").asInstanceOf[java.util.Map[String, Any]]
val interval = schedule.get("interval").asInstanceOf[java.util.Map[String, Any]]
interval.get("period") shouldBe expectedPeriod
interval.get("unit") shouldBe expectedUnit
}
}

0 comments on commit 42a18dd

Please sign in to comment.