Skip to content

Commit

Permalink
Add error counter and terminate logic in index monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 17, 2024
1 parent 300fedf commit 4838949
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,7 @@ class FlintSparkIndexMonitor(
*/
def startMonitor(indexName: String): Unit = {
val task = FlintSparkIndexMonitor.executor.scheduleWithFixedDelay(
() => {
logInfo(s"Scheduler trigger index monitor task for $indexName")
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
} else {
logError("Streaming job is not active. Cancelling monitor task")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})

stopMonitor(indexName)
logInfo("Index monitor task is cancelled")
}
} catch {
case e: Throwable =>
logError("Failed to update index log entry", e)
MetricsUtil.incrementCounter(MetricConstants.STREAMING_HEARTBEAT_FAILED_METRIC)
}
},
new FlintSparkIndexMonitorTask(indexName),
15, // Delay to ensure final logging is complete first, otherwise version conflicts
60, // TODO: make interval configurable
TimeUnit.SECONDS)
Expand All @@ -92,8 +66,63 @@ class FlintSparkIndexMonitor(
}
}

private class FlintSparkIndexMonitorTask(indexName: String) extends Runnable {

/** Error counter */
private var errorCnt = 0

override def run(): Unit = {
logInfo(s"Scheduler trigger index monitor task for $indexName")
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
} else {
logError("Streaming job is not active. Cancelling monitor task")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})

stopMonitor(indexName)
logInfo("Index monitor task is cancelled")
}

// Reset counter if success
errorCnt = 0
} catch {
case e: Throwable =>
errorCnt += 1
logError(s"Failed to update index log entry, consecutive errors: $errorCnt", e)
MetricsUtil.incrementCounter(MetricConstants.STREAMING_HEARTBEAT_FAILED_METRIC)

// Stop streaming job and its monitor if max retry limit reached
if (errorCnt >= 10) {
logInfo(s"Terminating streaming job and index monitor for $indexName")
stopStreamingJob(indexName)
stopMonitor(indexName)
logInfo(s"Streaming job and index monitor terminated")
}
}
}
}

private def isStreamingJobActive(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)

private def stopStreamingJob(indexName: String): Unit = {
val job = spark.streams.active.find(_.name == indexName)
if (job.isDefined) {
job.get.stop()
} else {
logWarning("Refreshing job not found")
}
}
}

object FlintSparkIndexMonitor extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
}
}

test("monitor task and streaming job should terminate if exception occurred consistently") {
val task = FlintSparkIndexMonitor.indexMonitorTracker(testFlintIndex)

// Block write on metadata log index
setWriteBlockOnMetadataLogIndex(true)
waitForMonitorTaskRun()

// Both monitor task and streaming job should stop after 5 times
10 times { (_, _) => {} }

task.isCancelled shouldBe true
spark.streams.active.exists(_.name == testFlintIndex) shouldBe false
}

private def getLatestTimestamp: (Long, Long) = {
val latest = latestLogEntry(testLatestId)
(latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])
Expand Down

0 comments on commit 4838949

Please sign in to comment.