Skip to content

Commit

Permalink
Support alter from manual to auto without specifying scheduler mode (o…
Browse files Browse the repository at this point in the history
…pensearch-project#930)

Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored and 14yapkc1 committed Dec 11, 2024
1 parent cca0be9 commit 8e5da7c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
private def isSchedulerModeChanged(
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Boolean = {
// Altering from manual to auto should not be interpreted as a scheduling mode change.
if (!originalOptions.options.contains(SCHEDULER_MODE.toString)) {
return false
}
updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,44 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
flint.queryIndex(testIndex).collect().toSet should have size 2
}

test("update full refresh index to auto refresh should start job with external scheduler") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create full refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "false")), testIndex)
.create()

spark.streams.active.find(_.name == testIndex) shouldBe empty
flint.queryIndex(testIndex).collect().toSet should have size 0
val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.isExternalSchedulerEnabled() shouldBe false

val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))

val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe empty
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.isExternalSchedulerEnabled() shouldBe true
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some(
FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL)

verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

test("update incremental refresh index to auto refresh should start job") {
withTempDir { checkpointDir =>
// Create incremental refresh Flint index and wait for complete
Expand Down Expand Up @@ -667,6 +705,51 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
}
}

test(
"update incremental refresh index to auto refresh should start job with external scheduler") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create incremental refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
Map(
"incremental_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()

spark.streams.active.find(_.name == testIndex) shouldBe empty
flint.queryIndex(testIndex).collect().toSet should have size 0
val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.isExternalSchedulerEnabled() shouldBe false

val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"incremental_refresh" -> "false",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))

val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe empty
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.isExternalSchedulerEnabled() shouldBe true
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some(
FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL)

verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

test("update auto refresh index to full refresh should stop job") {
// Create auto refresh Flint index and wait for complete
flint
Expand Down

0 comments on commit 8e5da7c

Please sign in to comment.