Skip to content

Commit

Permalink
test case for update options validation
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Mar 19, 2024
1 parent 52a34d6 commit 9399e69
Showing 1 changed file with 209 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

// auto_refresh remains true
the[IllegalArgumentException] thrownBy
Expand Down Expand Up @@ -133,6 +134,214 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
updateIndex(flint, testIndex, Map("checkpoint_location" -> "s3a://test/"))
}

test("should succeed if convert to full refresh with allowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "false"))
val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe false
readNewIndex.get.options.incrementalRefresh() shouldBe false
}

test("should fail if convert to full refresh with unallowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"checkpoint_location" -> "s3a://test/"))
the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"refresh_interval" -> "5 Minute"))
the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"watermark_delay" -> "1 Minute"))
}

test("should succeed if convert to incremental refresh with checkpoint_location option") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "true",
"checkpoint_location" -> "s3a://test/"))
val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe false
readNewIndex.get.options.incrementalRefresh() shouldBe true
readNewIndex.get.options.checkpointLocation() shouldBe Some("s3a://test/")
}

test("should succeed if convert to incremental refresh with watermark_delay option") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()
flint.refreshIndex(testIndex)

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "true",
"watermark_delay" -> "1 Minute"))
val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe false
readNewIndex.get.options.incrementalRefresh() shouldBe true
readNewIndex.get.options.watermarkDelay() shouldBe Some("1 Minute")
}

test("should fail if convert to incremental refresh with unallowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "false",
"incremental_refresh" -> "true",
"refresh_interval" -> "5 Minute"))
}

test("should succeed if convert to auto refresh with checkpoint_location") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath))

val readNewIndex = flint.describeIndex(testIndex)
readNewIndex shouldBe defined

readNewIndex.get.options.autoRefresh() shouldBe true
readNewIndex.get.options.incrementalRefresh() shouldBe false
readNewIndex.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)
}
}

test("should fail if convert to auto refresh with unallowed options") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "5 Minute"))

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"watermark_delay" -> "1 Minute"))
}

test("should fail if convert to invalid refresh mode") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

the[IllegalArgumentException] thrownBy
updateIndex(
flint,
testIndex,
Map(
"auto_refresh" -> "true",
"incremental_refresh" -> "true"))

deleteTestIndex(testIndex)

flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()

the[IllegalArgumentException] thrownBy
updateIndex(flint, testIndex, Map("incremental_refresh" -> "true"))

deleteTestIndex(testIndex)

flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true")))
.create()

the[IllegalArgumentException] thrownBy
updateIndex(flint, testIndex, Map("auto_refresh" -> "true"))
}

test("update full refresh index to auto refresh should start job") {
// Create full refresh Flint index
flint
Expand Down

0 comments on commit 9399e69

Please sign in to comment.