Skip to content

Commit

Permalink
test case for update options validation
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Mar 20, 2024
1 parent 52a34d6 commit 2101d34
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class FlintSparkMaterializedView(
private def watermark(timeCol: Attribute, child: LogicalPlan) = {
require(
options.watermarkDelay().isDefined,
"watermark delay is required for incremental refresh with aggregation")
"watermark delay is required for auto refresh and incremental refresh with aggregation")

val delay = options.watermarkDelay().get
EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexFactory}
import org.opensearch.flint.spark.FlintSpark.UpdateMode._
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder
Expand Down Expand Up @@ -92,7 +93,7 @@ object FlintSparkSqlAstBuilder {
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))

val oldOptions = oldIndex.options.options
validateOptions(oldOptions, updateOptions)
validateUpdateOptions(oldOptions, updateOptions, oldIndex.kind)

val mergedOptions = oldOptions ++ updateOptions
val newMetadata = oldIndex.metadata().copy(options = mergedOptions.mapValues(_.asInstanceOf[AnyRef]).asJava)
Expand All @@ -108,13 +109,17 @@ object FlintSparkSqlAstBuilder {

/**
* Validate update options.
* These are rules specific for updating index, validating the update is allowed.
* It doesn't check whether the resulting index options will be valid.
*
* @param oldOptions
* existing options
* @param updateOptions
* options to update
* @param indexKind
* index kind
*/
private def validateOptions(oldOptions: Map[String, String], updateOptions: Map[String, String]): Unit = {
private def validateUpdateOptions(oldOptions: Map[String, String], updateOptions: Map[String, String], indexKind: String): Unit = {
val mergedOptions = oldOptions ++ updateOptions
val newAutoRefresh = mergedOptions.getOrElse(AUTO_REFRESH.toString, "false")
val oldAutoRefresh = oldOptions.getOrElse(AUTO_REFRESH.toString, "false")
Expand All @@ -124,20 +129,23 @@ object FlintSparkSqlAstBuilder {
throw new IllegalArgumentException("auto_refresh option must be updated")
}

// validate allowed options depending on refresh mode
val newIncrementalRefresh = mergedOptions.getOrElse(INCREMENTAL_REFRESH.toString, "false")
val (refreshMode, allowedOptions) = (newAutoRefresh, newIncrementalRefresh) match {
case ("true", "false") =>
(AUTO, Set(AUTO_REFRESH, INCREMENTAL_REFRESH, CHECKPOINT_LOCATION))
case ("false", "false") =>
(FULL, Set(AUTO_REFRESH, INCREMENTAL_REFRESH))
case ("false", "true") =>
(INCREMENTAL, Set(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION))
val refreshMode = (newAutoRefresh, newIncrementalRefresh) match {
case ("true", "false") => AUTO
case ("false", "false") => FULL
case ("false", "true") => INCREMENTAL
case ("true", "true") =>
throw new IllegalArgumentException("auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
val allowedOptions = refreshMode match {
case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH)
case AUTO | INCREMENTAL =>
Set(AUTO_REFRESH, INCREMENTAL_REFRESH, REFRESH_INTERVAL, CHECKPOINT_LOCATION, WATERMARK_DELAY)
}
if (!updateOptions.keys.forall(allowedOptions.map(_.toString).contains)) {
throw new IllegalArgumentException(s"Altering to ${refreshMode} refresh index only allows options: ${allowedOptions}")
throw new IllegalArgumentException(s"Altering ${indexKind} index to ${refreshMode} refresh only allows options: ${allowedOptions}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

// auto_refresh remains true
the[IllegalArgumentException] thrownBy
Expand Down Expand Up @@ -133,6 +134,274 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
updateIndex(flint, testIndex, Map("checkpoint_location" -> "s3a://test/"))
}

test("should succeed if convert to full refresh with allowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "false"))
val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe false
readNewIndex.get.options.incrementalRefresh() shouldBe false
}

test("should fail if convert to full refresh with disallowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"checkpoint_location" -> "s3a://test/"))
the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"refresh_interval" -> "5 Minute"))
the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"watermark_delay" -> "1 Minute"))
}

test("should succeed if convert to incremental refresh with refresh_interval") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "true",
"refresh_interval" -> "1 Minute"))
val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe false
readNewIndex.get.options.incrementalRefresh() shouldBe true
readNewIndex.get.options.refreshInterval() shouldBe Some("1 Minute")
}

test("should succeed if convert to incremental refresh with checkpoint_location") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "true",
"checkpoint_location" -> "s3a://test/"))
val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe false
readNewIndex.get.options.incrementalRefresh() shouldBe true
readNewIndex.get.options.checkpointLocation() shouldBe Some("s3a://test/")
}

test("should succeed if convert to incremental refresh with watermark_delay") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "true",
"watermark_delay" -> "1 Minute"))
val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe false
readNewIndex.get.options.incrementalRefresh() shouldBe true
readNewIndex.get.options.watermarkDelay() shouldBe Some("1 Minute")
}

test("should fail if convert to incremental refresh with disallowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "true",
"output_mode" -> "complete"))
}

test("should succeed if convert to auto refresh with refresh_interval") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "5 Minute"))

val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe true
readNewIndex.get.options.incrementalRefresh() shouldBe false
readNewIndex.get.options.refreshInterval() shouldBe Some("5 Minute")
}

test("should succeed if convert to auto refresh with checkpoint_location") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath))

val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe true
readNewIndex.get.options.incrementalRefresh() shouldBe false
readNewIndex.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)
}
}

test("should succeed if convert to auto refresh with watermark_delay") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"watermark_delay" -> "5 Minute"))

val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe true
readNewIndex.get.options.incrementalRefresh() shouldBe false
readNewIndex.get.options.watermarkDelay() shouldBe Some("5 Minute")
}

test("should fail if convert to auto refresh with disallowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"output_mode" -> "complete"))
}

test("should fail if convert to invalid refresh mode") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"incremental_refresh" -> "true"))

deleteTestIndex(testIndex)

flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()

the[IllegalArgumentException] thrownBy
updateIndex(flint, testIndex, Map("incremental_refresh" -> "true"))

deleteTestIndex(testIndex)

flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true")))
.create()

the[IllegalArgumentException] thrownBy
updateIndex(flint, testIndex, Map("auto_refresh" -> "true"))
}

test("update full refresh index to auto refresh should start job") {
// Create full refresh Flint index
flint
Expand Down

0 comments on commit 2101d34

Please sign in to comment.