Skip to content

Commit

Permalink
scalafmtAll
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Mar 20, 2024
1 parent 2101d34 commit 0762842
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,20 @@ object FlintSparkSqlAstBuilder {
* @param updateOptions
* options to update
*/
def updateIndex(flint: FlintSpark, indexName: String, updateOptions: Map[String, String]): Option[String] = {
val oldIndex = flint.describeIndex(indexName)
def updateIndex(
flint: FlintSpark,
indexName: String,
updateOptions: Map[String, String]): Option[String] = {
val oldIndex = flint
.describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))

val oldOptions = oldIndex.options.options
validateUpdateOptions(oldOptions, updateOptions, oldIndex.kind)

val mergedOptions = oldOptions ++ updateOptions
val newMetadata = oldIndex.metadata().copy(options = mergedOptions.mapValues(_.asInstanceOf[AnyRef]).asJava)
val newMetadata =
oldIndex.metadata().copy(options = mergedOptions.mapValues(_.asInstanceOf[AnyRef]).asJava)
val newIndex = FlintSparkIndexFactory.create(newMetadata)

val updateMode = newIndex.options.autoRefresh() match {
Expand All @@ -108,9 +113,8 @@ object FlintSparkSqlAstBuilder {
}

/**
* Validate update options.
* These are rules specific for updating index, validating the update is allowed.
* It doesn't check whether the resulting index options will be valid.
* Validate update options. These are rules specific for updating index, validating the update
* is allowed. It doesn't check whether the resulting index options will be valid.
*
* @param oldOptions
* existing options
Expand All @@ -119,7 +123,10 @@ object FlintSparkSqlAstBuilder {
* @param indexKind
* index kind
*/
private def validateUpdateOptions(oldOptions: Map[String, String], updateOptions: Map[String, String], indexKind: String): Unit = {
private def validateUpdateOptions(
oldOptions: Map[String, String],
updateOptions: Map[String, String],
indexKind: String): Unit = {
val mergedOptions = oldOptions ++ updateOptions
val newAutoRefresh = mergedOptions.getOrElse(AUTO_REFRESH.toString, "false")
val oldAutoRefresh = oldOptions.getOrElse(AUTO_REFRESH.toString, "false")
Expand All @@ -135,17 +142,24 @@ object FlintSparkSqlAstBuilder {
case ("false", "false") => FULL
case ("false", "true") => INCREMENTAL
case ("true", "true") =>
throw new IllegalArgumentException("auto_refresh and incremental_refresh options cannot both be true")
throw new IllegalArgumentException(
"auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
val allowedOptions = refreshMode match {
case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH)
case AUTO | INCREMENTAL =>
Set(AUTO_REFRESH, INCREMENTAL_REFRESH, REFRESH_INTERVAL, CHECKPOINT_LOCATION, WATERMARK_DELAY)
Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
}
if (!updateOptions.keys.forall(allowedOptions.map(_.toString).contains)) {
throw new IllegalArgumentException(s"Altering ${indexKind} index to ${refreshMode} refresh only allows options: ${allowedOptions}")
throw new IllegalArgumentException(
s"Altering ${indexKind} index to ${refreshMode} refresh only allows options: ${allowedOptions}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe defined
flint.queryIndex(testFlintIndex).count() shouldBe 0

sql(
s"""
sql(s"""
| ALTER INDEX $testIndex ON $testTable
| WITH (auto_refresh = true)
| """.stripMargin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe defined
flint.queryIndex(testFlintIndex).count() shouldBe 0

sql(
s"""
sql(s"""
| ALTER MATERIALIZED VIEW $testMvName
| WITH (
| auto_refresh = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
}

test("update full refresh skipping index to auto refresh") {
sql(
s"""
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (
| year PARTITION,
Expand All @@ -368,8 +367,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testIndex) shouldBe defined
flint.queryIndex(testIndex).count() shouldBe 0

sql(
s"""
sql(s"""
| ALTER SKIPPING INDEX ON $testTable
| WITH (auto_refresh = true)
| """.stripMargin)
Expand All @@ -382,8 +380,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {

test("update incremental refresh skipping index to auto refresh") {
withTempDir { checkpointDir =>
sql(
s"""
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (
Expand All @@ -397,15 +394,13 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
flint.queryIndex(testIndex).count() shouldBe 2

// New data will be refreshed after updating index to auto refresh
sql(
s"""
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('Hello', 50, 'Vancouver')
|""".stripMargin)

sql(
s"""
sql(s"""
| ALTER SKIPPING INDEX ON $testTable
| WITH (
| auto_refresh = true,
Expand All @@ -421,8 +416,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
}

test("update auto refresh skipping index to full refresh") {
sql(
s"""
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (auto_refresh = true)
Expand All @@ -435,17 +429,15 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testIndex) shouldBe defined
flint.queryIndex(testIndex).count() shouldBe 2

sql(
s"""
sql(s"""
| ALTER SKIPPING INDEX ON $testTable
| WITH (auto_refresh = false)
| """.stripMargin)

spark.streams.active.find(_.name == testIndex) shouldBe empty

// New data won't be refreshed until refresh statement triggered
sql(
s"""
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('Hello', 50, 'Vancouver')
Expand All @@ -458,8 +450,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {

test("update auto refresh skipping index to incremental refresh") {
withTempDir { checkpointDir =>
sql(
s"""
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (
Expand All @@ -475,8 +466,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testIndex) shouldBe defined
flint.queryIndex(testIndex).count() shouldBe 2

sql(
s"""
sql(s"""
| ALTER SKIPPING INDEX ON $testTable
| WITH (
| auto_refresh = false,
Expand All @@ -487,8 +477,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
spark.streams.active.find(_.name == testIndex) shouldBe empty

// New data won't be refreshed until refresh statement triggered
sql(
s"""
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('Hello', 50, 'Vancouver')
Expand Down
Loading

0 comments on commit 0762842

Please sign in to comment.