From 30d08201b723e95c1c17c108d4f03933e08fe277 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 1 Aug 2024 16:04:27 -0700 Subject: [PATCH] Add more assertion in IT Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSparkIndexMonitor.scala | 7 ++++--- .../flint/spark/FlintSparkIndexMonitorITSuite.scala | 6 +++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 32e55613d..150596b0b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor * * @param spark * Spark session + * @param flintClient + * Flint client * @param flintMetadataLogService * Flint metadata log service */ @@ -159,10 +161,9 @@ class FlintSparkIndexMonitor( try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") + flintMetadataLogService.recordHeartbeat(indexName) - if (flintClient.exists(indexName)) { - flintMetadataLogService.recordHeartbeat(indexName) - } else { + if (!flintClient.exists(indexName)) { logInfo("Streaming job is active but data is deleted") stopStreamingJobAndMonitor(indexName) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index a16aac624..9e34354f8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -162,6 +162,10 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc Thread.sleep(5000) task.isCancelled shouldBe true spark.streams.active.exists(_.name == testFlintIndex) shouldBe false + + // Assert index state is still refreshing + val latestLog = latestLogEntry(testLatestId) + latestLog should contain("state" -> "refreshing") } test("await monitor terminated without exception should stay refreshing state") { @@ -173,7 +177,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc // Await until streaming job terminated flint.flintIndexMonitor.awaitMonitor() - // Assert index state is active now + // Assert index state is still refreshing val latestLog = latestLogEntry(testLatestId) latestLog should contain("state" -> "refreshing") }