Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Oct 11, 2024
1 parent 85a9923 commit 143c847
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,43 +446,38 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Unit = {
val isAutoRefreshChanged = updatedOptions.autoRefresh() != originalOptions.autoRefresh()
val isSchedulerModeChanged =
updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()

// Prevent changing both auto_refresh and scheduler_mode simultaneously
if (isAutoRefreshChanged && isSchedulerModeChanged) {
throw new IllegalArgumentException(
"Cannot change both auto_refresh and scheduler_mode simultaneously")
}

val changedOptions = updatedOptions.options.filterNot { case (k, v) =>
originalOptions.options.get(k).contains(v)
}.keySet

if (changedOptions.isEmpty) {
throw new IllegalArgumentException("No index options updated")
throw new IllegalArgumentException("No index option updated")
}

// Validate based on auto_refresh state and changes
(isAutoRefreshChanged, updatedOptions.autoRefresh()) match {
case (true, true) =>
// Changing from manual to auto refresh
if (updatedOptions.incrementalRefresh()) {
throw new IllegalArgumentException(
"Altering index to auto refresh while incremental refresh remains true")
}

val allowedOptions = Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
SCHEDULER_MODE,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
validateChangedOptions(changedOptions, allowedOptions, "Changing to auto refresh")

validateChangedOptions(changedOptions, allowedOptions, s"Altering index to auto refresh")
case (true, false) =>
val allowedOptions = if (updatedOptions.incrementalRefresh()) {
// Changing from auto refresh to incremental refresh
Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
SCHEDULER_MODE,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
Expand All @@ -493,11 +488,14 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
validateChangedOptions(
changedOptions,
allowedOptions,
"Changing to full/incremental refresh")
"Altering index to full/incremental refresh")

case (false, true) =>
// original refresh_mode is auto, only allow changing scheduler_mode
validateChangedOptions(changedOptions, Set(SCHEDULER_MODE), "Auto refresh remains true")
validateChangedOptions(
changedOptions,
Set(SCHEDULER_MODE),
"Altering index when auto_refresh remains true")

case (false, false) =>
// original refresh_mode is full/incremental, not allowed to change any options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.opensearch.index.reindex.DeleteByQueryRequest
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {

/** Test table and index name */
Expand All @@ -32,6 +34,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
// Delete all test indices
deleteTestIndex(testIndex)
sql(s"DROP TABLE $testTable")
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
}

test("update index with index options successfully") {
Expand Down Expand Up @@ -177,6 +180,121 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
}
}

// Test update options validation failure with external scheduler
Seq(
(
"update index without changing index option",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "true")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("checkpoint_location" -> "s3a://test/")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"))),
"No index option updated"),
(
"update index option when auto_refresh is false",
Seq(
(
Map.empty[String, String],
Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")),
(
Map.empty[String, String],
Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/")),
(Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/"))),
"No options can be updated when auto_refresh remains false"),
(
"update other index option besides scheduler_mode when auto_refresh is true",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("watermark_delay" -> "1 Minute"))),
"Altering index when auto_refresh remains true only allows changing: Set(scheduler_mode). Invalid options"),
(
"convert to full refresh with disallowed options",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "false", "scheduler_mode" -> "internal")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "false", "refresh_interval" -> "5 Minute")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "false", "watermark_delay" -> "1 Minute"))),
"Altering index to full/incremental refresh only allows changing"),
(
"convert to auto refresh with disallowed options",
Seq(
(
Map.empty[String, String],
Map(
"auto_refresh" -> "true",
"output_mode" -> "complete",
"checkpoint_location" -> "s3a://test/"))),
"Altering index to auto refresh only allows changing: Set(auto_refresh, watermark_delay, scheduler_mode, " +
"refresh_interval, incremental_refresh, checkpoint_location). Invalid options: Set(output_mode)"),
(
"convert to invalid refresh mode",
Seq(
(
Map.empty[String, String],
Map(
"auto_refresh" -> "true",
"incremental_refresh" -> "true",
"checkpoint_location" -> "s3a://test/"))),
"Altering index to auto refresh while incremental refresh remains true"))
.foreach { case (testName, testCases, expectedErrorMessage) =>
test(s"should fail if $testName and external scheduler enabled") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")
testCases.foreach { case (initialOptionsMap, updateOptionsMap) =>
logInfo(s"initialOptionsMap: ${initialOptionsMap}")
logInfo(s"updateOptionsMap: ${updateOptionsMap}")

withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
initialOptionsMap
.get("checkpoint_location")
.map(_ =>
initialOptionsMap
.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(initialOptionsMap)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val index = flint.describeIndex(testIndex).get
val exception = the[IllegalArgumentException] thrownBy {
val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
index,
FlintSparkIndexOptions(
updateOptionsMap
.get("checkpoint_location")
.map(_ =>
updateOptionsMap
.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(updateOptionsMap)))
flint.updateIndex(updatedIndex)
}

exception.getMessage should include(expectedErrorMessage)

deleteTestIndex(testIndex)
}
}
}
}

// Test update options validation success
Seq(
(
Expand Down

0 comments on commit 143c847

Please sign in to comment.