diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 32e55613d..150596b0b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor * * @param spark * Spark session + * @param flintClient + * Flint client * @param flintMetadataLogService * Flint metadata log service */ @@ -159,10 +161,9 @@ class FlintSparkIndexMonitor( try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") + flintMetadataLogService.recordHeartbeat(indexName) - if (flintClient.exists(indexName)) { - flintMetadataLogService.recordHeartbeat(indexName) - } else { + if (!flintClient.exists(indexName)) { logInfo("Streaming job is active but data is deleted") stopStreamingJobAndMonitor(indexName) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index a16aac624..9e34354f8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -162,6 +162,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc Thread.sleep(5000) task.isCancelled shouldBe true spark.streams.active.exists(_.name == testFlintIndex) shouldBe false + + // Assert index state is still refreshing + val latestLog = latestLogEntry(testLatestId) + latestLog should contain("state" -> "refreshing") } test("await monitor terminated without exception should stay refreshing state") { @@ -173,7 +177,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc // Await until streaming job terminated flint.flintIndexMonitor.awaitMonitor() - // Assert index state is active now + // Assert index state is still refreshing val latestLog = latestLogEntry(testLatestId) latestLog should contain("state" -> "refreshing") }