diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 5c34ca71d..4af147939 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -86,10 +86,25 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } } + test("job start time should not change until recover index") { + val (prevJobStartTime, _) = getLatestTimestamp + + // Stop streaming job and wait for monitor task stopped + spark.streams.active.find(_.name == testFlintIndex).get.stop() + waitForMonitorTaskRun() + + // Restart streaming job and monitor task + flint.recoverIndex(testFlintIndex) + waitForMonitorTaskRun() + + val (jobStartTime, _) = getLatestTimestamp + jobStartTime should be > prevJobStartTime + } + test("monitor task should terminate if streaming job inactive") { val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex) - // Stop streaming job intentionally + // Stop streaming job and wait for monitor task stopped spark.streams.active.find(_.name == testFlintIndex).get.stop() waitForMonitorTaskRun()