diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 2f44a28f4..afa456cc7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -42,33 +42,7 @@ class FlintSparkIndexMonitor( */ def startMonitor(indexName: String): Unit = { val task = FlintSparkIndexMonitor.executor.scheduleWithFixedDelay( - () => { - logInfo(s"Scheduler trigger index monitor task for $indexName") - try { - if (isStreamingJobActive(indexName)) { - logInfo("Streaming job is still active") - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(_ => {}) - } else { - logError("Streaming job is not active. Cancelling monitor task") - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(_ => true) - .finalLog(latest => latest.copy(state = FAILED)) - .commit(_ => {}) - - stopMonitor(indexName) - logInfo("Index monitor task is cancelled") - } - } catch { - case e: Throwable => - logError("Failed to update index log entry", e) - MetricsUtil.incrementCounter(MetricConstants.STREAMING_HEARTBEAT_FAILED_METRIC) - } - }, + new FlintSparkIndexMonitorTask(indexName), 15, // Delay to ensure final logging is complete first, otherwise version conflicts 60, // TODO: make interval configurable TimeUnit.SECONDS) @@ -92,8 +66,63 @@ class FlintSparkIndexMonitor( } } + private class FlintSparkIndexMonitorTask(indexName: String) extends Runnable { + + /** Error counter */ + private var errorCnt = 0 + + override def run(): Unit = { + logInfo(s"Scheduler trigger index monitor task for $indexName") + try { + if (isStreamingJobActive(indexName)) { + logInfo("Streaming job is still active") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == REFRESHING) + .finalLog(latest => latest) // timestamp will update automatically + .commit(_ => {}) + } else { + logError("Streaming job is not active. Cancelling monitor task") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = FAILED)) + .commit(_ => {}) + + stopMonitor(indexName) + logInfo("Index monitor task is cancelled") + } + + // Reset counter if success + errorCnt = 0 + } catch { + case e: Throwable => + errorCnt += 1 + logError(s"Failed to update index log entry, consecutive errors: $errorCnt", e) + MetricsUtil.incrementCounter(MetricConstants.STREAMING_HEARTBEAT_FAILED_METRIC) + + // Stop streaming job and its monitor if max retry limit reached + if (errorCnt >= 10) { + logInfo(s"Terminating streaming job and index monitor for $indexName") + stopStreamingJob(indexName) + stopMonitor(indexName) + logInfo(s"Streaming job and index monitor terminated") + } + } + } + } + private def isStreamingJobActive(indexName: String): Boolean = spark.streams.active.exists(_.name == indexName) + + private def stopStreamingJob(indexName: String): Unit = { + val job = spark.streams.active.find(_.name == indexName) + if (job.isDefined) { + job.get.stop() + } else { + logWarning("Refreshing job not found") + } + } } object FlintSparkIndexMonitor extends Logging { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index d6028bcb0..bc9ba082b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -128,6 +128,20 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc } } + test("monitor task and streaming job should terminate if exception occurred consistently") { + val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex) + + // Block write on metadata log index + setWriteBlockOnMetadataLogIndex(true) + waitForMonitorTaskRun() + + // Both monitor task and streaming job should stop after 5 times + 10 times { (_, _) => {} } + + task.isCancelled shouldBe true + spark.streams.active.exists(_.name == testFlintIndex) shouldBe false + } + private def getLatestTimestamp: (Long, Long) = { val latest = latestLogEntry(testLatestId) (latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])