Skip to content

Commit

Permalink
scalafmtAll
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Mar 26, 2024
1 parent 27a5357 commit 66627bd
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}

// Get the changed option names
val updateOptionNames = updatedOptions.options.filterNot {
case (k, v) => originalOptions.options.get(k).contains(v)
val updateOptionNames = updatedOptions.options.filterNot { case (k, v) =>
originalOptions.options.get(k).contains(v)
}.keys
if (!updateOptionNames.forall(allowedOptionNames.map(_.toString).contains)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
* @return
* updated Flint index
*/
def copyWithUpdate(index: FlintSparkIndex, updateOptions: FlintSparkIndexOptions): FlintSparkIndex = {
def copyWithUpdate(
index: FlintSparkIndex,
updateOptions: FlintSparkIndexOptions): FlintSparkIndex = {
val originalOptions = index.options
val updatedOptions = originalOptions.copy(options = originalOptions.options ++ updateOptions.options)
val updatedOptions =
originalOptions.copy(options = originalOptions.options ++ updateOptions.options)
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,18 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
(
"update index without changing auto_refresh option",
Seq(
(
Map("auto_refresh" -> "true"),
Map("auto_refresh" -> "true")),
(Map("auto_refresh" -> "true"), Map("auto_refresh" -> "true")),
(
Map("auto_refresh" -> "true"),
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/")),
(
Map("auto_refresh" -> "true"),
Map("checkpoint_location" -> "s3a://test/")),
(
Map("auto_refresh" -> "true"),
Map("watermark_delay" -> "1 Minute")),
(
Map.empty[String, String],
Map("auto_refresh" -> "false")),
(Map("auto_refresh" -> "true"), Map("checkpoint_location" -> "s3a://test/")),
(Map("auto_refresh" -> "true"), Map("watermark_delay" -> "1 Minute")),
(Map.empty[String, String], Map("auto_refresh" -> "false")),
(
Map.empty[String, String],
Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")),
(
Map.empty[String, String],
Map("incremental_refresh" -> "true")),
(
Map.empty[String, String],
Map("checkpoint_location" -> "s3a://test/")))),
(Map.empty[String, String], Map("incremental_refresh" -> "true")),
(Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/")))),
(
"convert to full refresh with disallowed options",
Seq(
Expand All @@ -131,47 +119,45 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
(
"convert to auto refresh with disallowed options",
Seq(
(
Map.empty[String, String],
Map("auto_refresh" -> "true", "output_mode" -> "complete")))),
(Map.empty[String, String], Map("auto_refresh" -> "true", "output_mode" -> "complete")))),
(
"convert to invalid refresh mode",
Seq(
(
Map.empty[String, String],
Map("auto_refresh" -> "true", "incremental_refresh" -> "true")),
(
Map("auto_refresh" -> "true"),
Map("incremental_refresh" -> "true")),
(Map("auto_refresh" -> "true"), Map("incremental_refresh" -> "true")),
(
Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "true"))))
).foreach { case (testName, testCases) =>
Map("auto_refresh" -> "true"))))).foreach { case (testName, testCases) =>
test(s"should fail if $testName") {
withTempDir { checkpointDir =>
testCases.foreach { case (initialOptionsMap, updateOptionsMap) =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(
initialOptionsMap
.options(
FlintSparkIndexOptions(initialOptionsMap
.get("checkpoint_location")
.map(_ => initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(initialOptionsMap)
))
.map(_ =>
initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(initialOptionsMap)))
.create()
flint.refreshIndex(testIndex)
val index = flint.describeIndex(testIndex).get
the[IllegalArgumentException] thrownBy {
val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(index, FlintSparkIndexOptions(
updateOptionsMap
.get("checkpoint_location")
.map(_ => updateOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(updateOptionsMap)
))
.copyWithUpdate(
index,
FlintSparkIndexOptions(
updateOptionsMap
.get("checkpoint_location")
.map(_ =>
updateOptionsMap
.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(updateOptionsMap)))
flint.updateIndex(updatedIndex)
}
deleteTestIndex(testIndex)
Expand Down Expand Up @@ -377,17 +363,24 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
val indexInitial = flint.describeIndex(testIndex).get
val updatedIndexObsolete = flint
.skippingIndex()
.copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)))
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))

// This other update finishes first, converting to auto refresh
flint.updateIndex(flint
.skippingIndex()
.copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))))
flint.updateIndex(
flint
.skippingIndex()
.copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))))
// Adding another update to convert to full refresh, so the obsolete update doesn't fail for option validation or state validation
val indexUpdated = flint.describeIndex(testIndex).get
flint.updateIndex(flint
.skippingIndex()
.copyWithUpdate(indexUpdated, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))))
flint.updateIndex(
flint
.skippingIndex()
.copyWithUpdate(indexUpdated, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))))

// This update trying to update an obsolete index should fail
the[IllegalStateException] thrownBy
Expand Down

0 comments on commit 66627bd

Please sign in to comment.