Skip to content

Commit

Permalink
move validation to FlintSpark
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Mar 25, 2024
1 parent 363e04b commit a30a170
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy
Expand Down Expand Up @@ -216,6 +217,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name

validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)

if (flintClient.exists(indexName)) {
try {
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
Expand Down Expand Up @@ -385,6 +393,52 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

/**
* Validate the index update options are allowed.
* @param originalOptions
* original options
* @param updatedOptions
* the updated options
*/
private def validateUpdateAllowed(
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Unit = {
// auto_refresh must change
if (updatedOptions.autoRefresh() == originalOptions.autoRefresh()) {
throw new IllegalArgumentException("auto_refresh option must be updated")
}

val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match {
case (true, false) => AUTO
case (false, false) => FULL
case (false, true) => INCREMENTAL
case (true, true) =>
throw new IllegalArgumentException(
"auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
val allowedOptionNames = refreshMode match {
case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH)
case AUTO | INCREMENTAL =>
Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
}

// Get the changed option names
val updateOptionNames = updatedOptions.options.filterNot {
case (k, v) => originalOptions.options.get(k).contains(v)
}.keys
if (!updateOptionNames.forall(allowedOptionNames.map(_.toString).contains)) {
throw new IllegalArgumentException(
s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptionNames}")
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
val indexName = index.name
flintClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,19 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)

def copyWithUpdate(index: FlintSparkIndex, options: FlintSparkIndexOptions): FlintSparkIndex = {
val updatedOptions = index.options.update(options)
/**
* Copy Flint index with updated options.
*
* @param index
* Flint index to copy
* @param updateOptions
* options to update
* @return
* updated Flint index
*/
def copyWithUpdate(index: FlintSparkIndex, updateOptions: FlintSparkIndexOptions): FlintSparkIndex = {
val originalOptions = index.options
val updatedOptions = originalOptions.copy(options = originalOptions.options ++ updateOptions.options)
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._

/**
* Flint Spark index configurable options.
Expand Down Expand Up @@ -119,20 +118,6 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
map.result()
}

/**
* Merge two FlintSparkIndexOptions. If an option exists in both instances, the value from the
* other instance overwrites the value from this instance.
* @param other
* options to update
* @return
* updated Flint Spark index options
*/
def update(other: FlintSparkIndexOptions): FlintSparkIndexOptions = {
val updatedOptions = FlintSparkIndexOptions(options ++ other.options)
validateUpdateAllowed(other, updatedOptions)
updatedOptions
}

private def getOptionValue(name: OptionName): Option[String] = {
options.get(name.toString)
}
Expand All @@ -142,47 +127,6 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
.map(opt => (parse(opt) \ key).extract[Map[String, String]])
.getOrElse(Map.empty)
}

/**
* Validate the index update options are allowed.
* @param other
* options to update
* @param updatedOptions
* the updated options
*/
private def validateUpdateAllowed(
other: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Unit = {
// auto_refresh must change
if (updatedOptions.autoRefresh() == autoRefresh()) {
throw new IllegalArgumentException("auto_refresh option must be updated")
}

val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match {
case (true, false) => AUTO
case (false, false) => FULL
case (false, true) => INCREMENTAL
case (true, true) =>
throw new IllegalArgumentException(
"auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
val allowedOptions = refreshMode match {
case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH)
case AUTO | INCREMENTAL =>
Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
}
if (!other.options.keys.forall(allowedOptions.map(_.toString).contains)) {
throw new IllegalArgumentException(
s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptions}")
}
}
}

object FlintSparkIndexOptions {
Expand Down
Loading

0 comments on commit a30a170

Please sign in to comment.