Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support alter refresh interval on external scheduler #801

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.scheduler.{AsyncQuerySchedulerBuilder, FlintSparkJobExternalSchedulingService, FlintSparkJobInternalSchedulingService, FlintSparkJobSchedulingService}
import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand Down Expand Up @@ -229,16 +230,16 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
val originalOptions = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options
validateUpdateAllowed(originalOptions, index.options)

val isSchedulerModeChanged =
index.options.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()
validateUpdateAllowed(originalOptions, index.options)
withTransaction[Option[String]](indexName, "Update Flint index") { tx =>
// Relies on validation to prevent:
// 1. auto-to-auto updates besides scheduler_mode
// 2. any manual-to-manual updates
// 3. both refresh_mode and scheduler_mode updated
(index.options.autoRefresh(), isSchedulerModeChanged) match {
(
index.options.autoRefresh(),
isSchedulerModeChanged(originalOptions, index.options)) match {
case (true, true) => updateSchedulerMode(index, tx)
case (true, false) => updateIndexManualToAuto(index, tx)
case (false, false) => updateIndexAutoToManual(index, tx)
Expand Down Expand Up @@ -478,11 +479,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
"Altering index to full/incremental refresh")

case (false, true) =>
// original refresh_mode is auto, only allow changing scheduler_mode
validateChangedOptions(
changedOptions,
Set(SCHEDULER_MODE),
"Altering index when auto_refresh remains true")
// original refresh_mode is auto, only allow changing scheduler_mode and potentially refresh_interval
var allowedOptions = Set(SCHEDULER_MODE)
val schedulerMode =
if (updatedOptions.isExternalSchedulerEnabled()) SchedulerMode.EXTERNAL
else SchedulerMode.INTERNAL
val contextPrefix =
s"Altering index when auto_refresh remains true and scheduler_mode is $schedulerMode"
if (updatedOptions.isExternalSchedulerEnabled()) {
allowedOptions += REFRESH_INTERVAL
}
validateChangedOptions(changedOptions, allowedOptions, contextPrefix)

case (false, false) =>
// original refresh_mode is full/incremental, not allowed to change any options
Expand All @@ -507,6 +514,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}
}

private def isSchedulerModeChanged(
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Boolean = {
updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()
}

private def updateIndexAutoToManual(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
Expand Down Expand Up @@ -587,7 +600,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
logInfo("Update index options complete")
oldService.handleJob(index, AsyncQuerySchedulerAction.UNSCHEDULE)
logInfo(s"Unscheduled ${if (isExternal) "internal" else "external"} jobs")
logInfo(
s"Unscheduled refresh jobs from ${if (isExternal) "internal" else "external"} scheduler")
newService.handleJob(index, AsyncQuerySchedulerAction.UPDATE)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import java.util.{Collections, UUID}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.core.logging.CustomLogging.logInfo
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ class FlintSparkIndexBuilderSuite
Some(
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")),
(
"set external mode when interval above threshold and no mode specified",
"set external mode when interval below threshold and no mode specified",
true,
"5 minutes",
Map("auto_refresh" -> "true", "refresh_interval" -> "10 minutes"),
Some(SchedulerMode.EXTERNAL.toString),
Some("10 minutes"),
Map("auto_refresh" -> "true", "refresh_interval" -> "1 minutes"),
Some(SchedulerMode.INTERNAL.toString),
Some("1 minutes"),
None),
(
"throw exception when interval below threshold but mode is external",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.opensearch.flint.common.FlintVersion.current
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.scalatest.matchers.must.Matchers.defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

package org.opensearch.flint.spark

import scala.jdk.CollectionConverters.mapAsJavaMapConverter

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.native.JsonMethods._
import org.opensearch.OpenSearchException
import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.DeleteByQueryRequest
Expand Down Expand Up @@ -180,6 +186,96 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
}
}

test("update auto refresh index to switch scheduler mode") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create auto refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "4 Minute",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.refreshInterval() shouldBe Some("4 Minute")
the[OpenSearchException] thrownBy {
val client =
OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava))
client.get(
new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, testIndex),
RequestOptions.DEFAULT)
}

// Update Flint index to change refresh interval
val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(
Map("scheduler_mode" -> "external", "refresh_interval" -> "5 Minutes")))
flint.updateIndex(updatedIndex)

// Verify index after update
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some("5 Minutes")
indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)

// Verify scheduler index is updated
verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

test("update auto refresh index to change refresh interval") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create auto refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "10 Minute",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()

val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.refreshInterval() shouldBe Some("10 Minute")
verifySchedulerIndex(testIndex, 10, "MINUTES")

// Update Flint index to change refresh interval
val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(Map("refresh_interval" -> "5 Minutes")))
flint.updateIndex(updatedIndex)

// Verify index after update
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some("5 Minutes")
indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)

// Verify scheduler index is updated
verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

// Test update options validation failure with external scheduler
Seq(
(
Expand Down Expand Up @@ -207,12 +303,32 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
(Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/"))),
"No options can be updated when auto_refresh remains false"),
(
"update other index option besides scheduler_mode when auto_refresh is true",
"update index option when refresh_interval value belows threshold",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("refresh_interval" -> "4 minutes"))),
"Input refresh_interval is 4 minutes, required above the interval threshold of external scheduler: 5 minutes"),
(
"update index option when no change on auto_refresh",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("scheduler_mode" -> "internal", "refresh_interval" -> "4 minutes")),
(
Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "internal",
"checkpoint_location" -> "s3a://test/"),
Map("refresh_interval" -> "4 minutes"))),
"Altering index when auto_refresh remains true and scheduler_mode is internal only allows changing: Set(scheduler_mode). Invalid options"),
(
"update other index option besides scheduler_mode and refresh_interval when auto_refresh is true",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("watermark_delay" -> "1 Minute"))),
"Altering index when auto_refresh remains true only allows changing: Set(scheduler_mode). Invalid options"),
"Altering index when auto_refresh remains true and scheduler_mode is external only allows changing: Set(scheduler_mode, refresh_interval). Invalid options"),
(
"convert to full refresh with disallowed options",
Seq(
Expand Down Expand Up @@ -655,4 +771,28 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
flint.queryIndex(testIndex).collect().toSet should have size 1
}
}

private def verifySchedulerIndex(
indexName: String,
expectedPeriod: Int,
expectedUnit: String): Unit = {
val client = OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava))
val response = client.get(
new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, indexName),
RequestOptions.DEFAULT)

response.isExists shouldBe true
val sourceMap = response.getSourceAsMap

sourceMap.get("jobId") shouldBe indexName
sourceMap.get(
"scheduledQuery") shouldBe s"REFRESH SKIPPING INDEX ON spark_catalog.default.`test`"
sourceMap.get("enabled") shouldBe true
sourceMap.get("queryLang") shouldBe "sql"

val schedule = sourceMap.get("schedule").asInstanceOf[java.util.Map[String, Any]]
val interval = schedule.get("interval").asInstanceOf[java.util.Map[String, Any]]
interval.get("period") shouldBe expectedPeriod
interval.get("unit") shouldBe expectedUnit
}
}
Loading