Skip to content

Commit

Permalink
Add more assertion in IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Aug 1, 2024
1 parent 2c3c7dc commit 30d0820
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor
*
* @param spark
* Spark session
* @param flintClient
* Flint client
* @param flintMetadataLogService
* Flint metadata log service
*/
Expand Down Expand Up @@ -159,10 +161,9 @@ class FlintSparkIndexMonitor(
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService.recordHeartbeat(indexName)

if (flintClient.exists(indexName)) {
flintMetadataLogService.recordHeartbeat(indexName)
} else {
if (!flintClient.exists(indexName)) {
logInfo("Streaming job is active but data is deleted")
stopStreamingJobAndMonitor(indexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
Thread.sleep(5000)
task.isCancelled shouldBe true
spark.streams.active.exists(_.name == testFlintIndex) shouldBe false

// Assert index state is still refreshing
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "refreshing")
}

test("await monitor terminated without exception should stay refreshing state") {
Expand All @@ -173,7 +177,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
// Await until streaming job terminated
flint.flintIndexMonitor.awaitMonitor()

// Assert index state is active now
// Assert index state is still refreshing
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "refreshing")
}
Expand Down

0 comments on commit 30d0820

Please sign in to comment.