Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix index state stuck in refreshing when streaming job exits early #370

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class FlintSparkIndexMonitor(
*/
def stopMonitor(indexName: String): Unit = {
logInfo(s"Cancelling scheduled task for index $indexName")
val task = FlintSparkIndexMonitor.indexMonitorTracker.remove(indexName)
// Hack: Don't remove because awaitMonitor API requires Flint index name.
val task = FlintSparkIndexMonitor.indexMonitorTracker.get(indexName)
if (task.isDefined) {
task.get.cancel(true)
} else {
Expand Down Expand Up @@ -119,26 +120,20 @@ class FlintSparkIndexMonitor(
logInfo(s"Streaming job $name terminated without exception")
} catch {
case e: Throwable =>
/**
* Transition the index state to FAILED upon encountering an exception. Retry in case
* conflicts with final transaction in scheduled task.
* ```
* TODO:
* 1) Determine the appropriate state code based on the type of exception encountered
* 2) Record and persist the error message of the root cause for further diagnostics.
* ```
*/
logError(s"Streaming job $name terminated with exception", e)
retry {
flintClient
.startTransaction(name, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
}
logError(s"Streaming job $name terminated with exception: ${e.getMessage}")
retryUpdateIndexStateToFailed(name)
}
} else {
logInfo(s"Index monitor for [$indexName] not found")
logInfo(s"Index monitor for [$indexName] not found.")

// Streaming job may exit early. Try to find Flint index name in monitor list.
val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption
if (name.isDefined) {
logInfo(s"Found index name in index monitor task list: ${name.get}")
retryUpdateIndexStateToFailed(name.get)
} else {
logInfo(s"Index monitor task list is empty")
}
}
}

Expand Down Expand Up @@ -199,6 +194,26 @@ class FlintSparkIndexMonitor(
}
}

/**
* Transition the index state to FAILED upon encountering an exception. Retry in case conflicts
* with final transaction in scheduled task.
* ```
* TODO:
* 1) Determine the appropriate state code based on the type of exception encountered
* 2) Record and persist the error message of the root cause for further diagnostics.
* ```
*/
private def retryUpdateIndexStateToFailed(indexName: String): Unit = {
logInfo(s"Updating index state to failed for $indexName")
retry {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
}
}

private def retry(operation: => Unit): Unit = {
// Retry policy for 3 times every 1 second
val retryPolicy = RetryPolicy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,19 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
latestLog should contain("state" -> "failed")
}

test(
"await monitor terminated with streaming job exit early should update index state to failed") {
// Terminate streaming job intentionally before await
spark.streams.active.find(_.name == testFlintIndex).get.stop()

// Await until streaming job terminated
flint.flintIndexMonitor.awaitMonitor()

// Assert index state is active now
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "failed")
}

private def getLatestTimestamp: (Long, Long) = {
val latest = latestLogEntry(testLatestId)
(latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ case class JobOperator(

try {
// Wait for streaming job complete if no error and there is streaming job running
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
if (!exceptionThrown && streaming && spark.streams.active.nonEmpty) {
if (!exceptionThrown && streaming) {
// Clean Spark shuffle data after each microBatch.
spark.streams.addListener(new ShuffleCleaner(spark))
// Await streaming job thread to finish before the main thread terminates
Expand Down
Loading