Skip to content

Commit

Permalink
update validation rule
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Mar 19, 2024
1 parent e5c1af7 commit af4da21
Showing 1 changed file with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexFactory}
import org.opensearch.flint.spark.FlintSpark.UpdateMode._
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
Expand Down Expand Up @@ -91,10 +92,10 @@ object FlintSparkSqlAstBuilder {
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))

val oldOptions = oldIndex.options.options
val newOptions = oldOptions ++ updateOptions
validateOptions(oldOptions, newOptions)
validateOptions(oldOptions, updateOptions)

val newMetadata = oldIndex.metadata().copy(options = newOptions.mapValues(_.asInstanceOf[AnyRef]).asJava)
val mergedOptions = oldOptions ++ updateOptions
val newMetadata = oldIndex.metadata().copy(options = mergedOptions.mapValues(_.asInstanceOf[AnyRef]).asJava)
val newIndex = FlintSparkIndexFactory.create(newMetadata)

val updateMode = newIndex.options.autoRefresh() match {
Expand All @@ -106,19 +107,37 @@ object FlintSparkSqlAstBuilder {
}

/**
* Validate auto_refresh option must change.
* Validate update options.
*
* @param oldOptions
* existing options
* @param newOptions
* options after update
* @param updateOptions
* options to update
*/
private def validateOptions(oldOptions: Map[String, String], newOptions: Map[String, String]): Unit = {
val newAutoRefresh = newOptions.get(AUTO_REFRESH.toString)
val oldAutoRefresh = oldOptions.get(AUTO_REFRESH.toString)
private def validateOptions(oldOptions: Map[String, String], updateOptions: Map[String, String]): Unit = {
val mergedOptions = oldOptions ++ updateOptions
val newAutoRefresh = mergedOptions.getOrElse(AUTO_REFRESH.toString, "false")
val oldAutoRefresh = oldOptions.getOrElse(AUTO_REFRESH.toString, "false")

// auto_refresh must change
if (newAutoRefresh == oldAutoRefresh) {
throw new IllegalArgumentException("auto_refresh option must be updated")
}
}

// validate allowed options depending on refresh mode
val newIncrementalRefresh = mergedOptions.getOrElse(INCREMENTAL_REFRESH.toString, "false")
val (refreshMode, allowedOptions) = (newAutoRefresh, newIncrementalRefresh) match {
case ("true", "false") =>
(AUTO, Set(AUTO_REFRESH, INCREMENTAL_REFRESH, CHECKPOINT_LOCATION))
case ("false", "false") =>
(FULL, Set(AUTO_REFRESH, INCREMENTAL_REFRESH))
case ("false", "true") =>
(INCREMENTAL, Set(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION))
case ("true", "true") =>
throw new IllegalArgumentException("auto_refresh and incremental_refresh options cannot both be true")
}
if (!updateOptions.keys.forall(allowedOptions.map(_.toString).contains)) {
throw new IllegalArgumentException(s"Altering to ${refreshMode} refresh index only allows options: ${allowedOptions}")
}
}
}

0 comments on commit af4da21

Please sign in to comment.