diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 1295e3f1d..144b58404 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -159,18 +159,22 @@ class FlintSparkIndexMonitor( override def run(): Unit = { logInfo(s"Scheduler trigger index monitor task for $indexName") try { - if (isStreamingJobActive(indexName)) { - if (flintClient.exists(indexName)) { - logInfo("Streaming job is still active") + val isJobActive = isStreamingJobActive(indexName) + val indexExists = flintClient.exists(indexName) + + (isJobActive, indexExists) match { + case (true, true) => + logInfo("Streaming job is active and index exists") flintMetadataLogService.recordHeartbeat(indexName) - } else { - logWarning("Streaming job is active but data is deleted") + + case (true, false) => + logWarning("Streaming job is active but index is deleted") stopStreamingJobAndMonitor(indexName) - } - } else { - logError("Streaming job is not active. Cancelling monitor task") - stopMonitor(indexName) - logInfo("Index monitor task is cancelled") + + case (false, _) => + logError("Streaming job is not active. Cancelling monitor task") + stopMonitor(indexName) + logInfo("Index monitor task is cancelled") } errorCnt = 0 // Reset counter if no error } catch {