Skip to content

Commit

Permalink
Handle streaming job exit early case
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 6, 2024
1 parent c5ad7e7 commit fbcd538
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class FlintSparkIndexMonitor(
*/
def stopMonitor(indexName: String): Unit = {
logInfo(s"Cancelling scheduled task for index $indexName")
val task = FlintSparkIndexMonitor.indexMonitorTracker.remove(indexName)
// Hack: Don't remove because awaitMonitor API requires Flint index name.
val task = FlintSparkIndexMonitor.indexMonitorTracker.get(indexName)
if (task.isDefined) {
task.get.cancel(true)
} else {
Expand Down Expand Up @@ -119,26 +120,20 @@ class FlintSparkIndexMonitor(
logInfo(s"Streaming job $name terminated without exception")
} catch {
case e: Throwable =>
/**
* Transition the index state to FAILED upon encountering an exception. Retry in case
* conflicts with final transaction in scheduled task.
* ```
* TODO:
* 1) Determine the appropriate state code based on the type of exception encountered
* 2) Record and persist the error message of the root cause for further diagnostics.
* ```
*/
logError(s"Streaming job $name terminated with exception", e)
retry {
flintClient
.startTransaction(name, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
}
logError(s"Streaming job $name terminated with exception: ${e.getMessage}")
retryUpdateIndexStateToFailed(name)
}
} else {
logInfo(s"Index monitor for [$indexName] not found")
logInfo(s"Index monitor for [$indexName] not found.")

// Streaming job may exit early. Try to find Flint index name in monitor list.
val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption
if (name.isDefined) {
logInfo(s"Found index name in index monitor task list: ${name.get}")
retryUpdateIndexStateToFailed(name.get)
} else {
logInfo(s"Index monitor task list is empty")
}
}
}

Expand Down Expand Up @@ -199,6 +194,26 @@ class FlintSparkIndexMonitor(
}
}

/**
* Transition the index state to FAILED upon encountering an exception. Retry in case conflicts
* with final transaction in scheduled task.
* ```
* TODO:
* 1) Determine the appropriate state code based on the type of exception encountered
* 2) Record and persist the error message of the root cause for further diagnostics.
* ```
*/
private def retryUpdateIndexStateToFailed(indexName: String): Unit = {
logInfo(s"Updating index state to failed for $indexName")
retry {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
}
}

private def retry(operation: => Unit): Unit = {
// Retry policy for 3 times every 1 second
val retryPolicy = RetryPolicy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,35 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
latestLog should contain("state" -> "failed")
}

test(
"await monitor terminated with streaming job exit early should update index state to failed") {
new Thread(() => {
Thread.sleep(3000L)

// Set Flint index readonly to simulate streaming job exception
val settings = Map("index.blocks.write" -> true)
val request = new UpdateSettingsRequest(testFlintIndex).settings(settings.asJava)
openSearchClient.indices().putSettings(request, RequestOptions.DEFAULT)

// Trigger a new micro batch execution
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=6)
| VALUES ('Test', 35, 'Vancouver')
| """.stripMargin)
}).start()

// Terminate streaming job intentionally before await
spark.streams.active.find(_.name == testFlintIndex).get.stop()

// Await until streaming job terminated
flint.flintIndexMonitor.awaitMonitor()

// Assert index state is active now
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "failed")
}

private def getLatestTimestamp: (Long, Long) = {
val latest = latestLogEntry(testLatestId)
(latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ case class JobOperator(

try {
// Wait for streaming job complete if no error and there is streaming job running
if (!exceptionThrown && streaming && spark.streams.active.nonEmpty) {
if (!exceptionThrown && streaming) {
// Clean Spark shuffle data after each microBatch.
spark.streams.addListener(new ShuffleCleaner(spark))
// Await streaming job thread to finish before the main thread terminates
Expand Down

0 comments on commit fbcd538

Please sign in to comment.