diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index d3f3ff0ee..b312d7b60 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -80,7 +80,8 @@ class FlintSparkIndexMonitor( */ def stopMonitor(indexName: String): Unit = { logInfo(s"Cancelling scheduled task for index $indexName") - val task = FlintSparkIndexMonitor.indexMonitorTracker.remove(indexName) + // Hack: Don't remove because awaitMonitor API requires Flint index name. + val task = FlintSparkIndexMonitor.indexMonitorTracker.get(indexName) if (task.isDefined) { task.get.cancel(true) } else { @@ -119,26 +120,20 @@ class FlintSparkIndexMonitor( logInfo(s"Streaming job $name terminated without exception") } catch { case e: Throwable => - /** - * Transition the index state to FAILED upon encountering an exception. Retry in case - * conflicts with final transaction in scheduled task. - * ``` - * TODO: - * 1) Determine the appropriate state code based on the type of exception encountered - * 2) Record and persist the error message of the root cause for further diagnostics. - * ``` - */ - logError(s"Streaming job $name terminated with exception", e) - retry { - flintClient - .startTransaction(name, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest.copy(state = FAILED)) - .commit(_ => {}) - } + logError(s"Streaming job $name terminated with exception: ${e.getMessage}") + retryUpdateIndexStateToFailed(name) } } else { - logInfo(s"Index monitor for [$indexName] not found") + logInfo(s"Index monitor for [$indexName] not found.") + + // Streaming job may exit early. Try to find Flint index name in monitor list. + val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption + if (name.isDefined) { + logInfo(s"Found index name in index monitor task list: ${name.get}") + retryUpdateIndexStateToFailed(name.get) + } else { + logInfo(s"Index monitor task list is empty") + } } } @@ -199,6 +194,26 @@ class FlintSparkIndexMonitor( } } + /** + * Transition the index state to FAILED upon encountering an exception. Retry in case conflicts + * with final transaction in scheduled task. + * ``` + * TODO: + * 1) Determine the appropriate state code based on the type of exception encountered + * 2) Record and persist the error message of the root cause for further diagnostics. + * ``` + */ + private def retryUpdateIndexStateToFailed(indexName: String): Unit = { + logInfo(s"Updating index state to failed for $indexName") + retry { + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == REFRESHING) + .finalLog(latest => latest.copy(state = FAILED)) + .commit(_ => {}) + } + } + private def retry(operation: => Unit): Unit = { // Retry policy for 3 times every 1 second val retryPolicy = RetryPolicy diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 2627ed964..951e99d97 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -188,6 +188,35 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc latestLog should contain("state" -> "failed") } + test( + "await monitor terminated with streaming job exit early should update index state to failed") { + new Thread(() => { + Thread.sleep(3000L) + + // Set Flint index readonly to simulate streaming job exception + val settings = Map("index.blocks.write" -> true) + val request = new UpdateSettingsRequest(testFlintIndex).settings(settings.asJava) + openSearchClient.indices().putSettings(request, RequestOptions.DEFAULT) + + // Trigger a new micro batch execution + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=6) + | VALUES ('Test', 35, 'Vancouver') + | """.stripMargin) + }).start() + + // Terminate streaming job intentionally before await + spark.streams.active.find(_.name == testFlintIndex).get.stop() + + // Await until streaming job terminated + flint.flintIndexMonitor.awaitMonitor() + + // Assert index state is active now + val latestLog = latestLogEntry(testLatestId) + latestLog should contain("state" -> "failed") + } + private def getLatestTimestamp: (Long, Long) = { val latest = latestLogEntry(testLatestId) (latest("jobStartTime").asInstanceOf[Long], latest("lastUpdateTime").asInstanceOf[Long]) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 3582bcf09..f78b61357 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -89,7 +89,7 @@ case class JobOperator( try { // Wait for streaming job complete if no error and there is streaming job running - if (!exceptionThrown && streaming && spark.streams.active.nonEmpty) { + if (!exceptionThrown && streaming) { // Clean Spark shuffle data after each microBatch. spark.streams.addListener(new ShuffleCleaner(spark)) // Await streaming job thread to finish before the main thread terminates