Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 4, 2024
1 parent 20a6935 commit d5acdc8
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
val options = flintIndexMetadataService.getIndexMetadata(indexName).options
val options = flintIndexMetadataService.getIndexMetadata(indexName).options.asScala
flintClient.deleteIndex(indexName)
flintIndexMetadataService.deleteIndexMetadata(indexName)

Option(options.get(CHECKPOINT_LOCATION.toString))
.foreach { checkpointDir =>
new FlintSparkCheckpoint(spark, checkpointDir.asInstanceOf[String])
.delete()
}
// Remove checkpoint folder if defined
val checkpoint = options
.get(CHECKPOINT_LOCATION.toString)
.map(path => new FlintSparkCheckpoint(spark, path.asInstanceOf[String]))
if (checkpoint.isDefined) {
checkpoint.get.delete()
}
true
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,34 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe empty
}

test("vacuum covering index with checkpoint") {
withTempDir { checkpointDir =>
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testFlintIndex)
.create()
flint.refreshIndex(testFlintIndex)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testFlintIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(testFlintIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

private def awaitRefreshComplete(query: String): Unit = {
sql(query)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,5 +408,33 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe empty
}

test("vacuum materialized view with checkpoint") {
withTempDir { checkpointDir =>
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath,
"watermark_delay" -> "1 Second")),
testFlintIndex)
.create()
flint.refreshIndex(testFlintIndex)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testFlintIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM MATERIALIZED VIEW $testMvName")
flint.describeIndex(testFlintIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("should remove checkpoint folder when vacuum") {
test("vacuum skipping index with checkpoint") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
Expand All @@ -934,10 +934,11 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)

flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
flint.vacuumIndex(testIndex)
flint.describeIndex(testIndex) shouldBe None
checkpointDir.exists() shouldBe false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
flint.describeIndex(testIndex) shouldBe empty
}

test("vacuum skipping index with checkpoint") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(testIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

test("analyze skipping index with for supported data types") {
val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable")

Expand Down

0 comments on commit d5acdc8

Please sign in to comment.