diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index b312d7b60..594f99b02 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -126,7 +126,12 @@ class FlintSparkIndexMonitor( } else { logInfo(s"Index monitor for [$indexName] not found.") - // Streaming job may exit early. Try to find Flint index name in monitor list. + /* + * Streaming job exits early. Try to find Flint index name in monitor list. + * Assuming: 1) there are at most 1 entry in the list, otherwise index name + * must be given upon this method call; 2) this await API must be called for + * auto refresh index, otherwise index state will be updated mistakenly. + */ val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption if (name.isDefined) { logInfo(s"Found index name in index monitor task list: ${name.get}") diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index f78b61357..f315dc836 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -88,11 +88,11 @@ case class JobOperator( } try { - // Wait for streaming job complete if no error and there is streaming job running + // Wait for streaming job complete if no error if (!exceptionThrown && streaming) { // Clean Spark shuffle data after each microBatch. spark.streams.addListener(new ShuffleCleaner(spark)) - // Await streaming job thread to finish before the main thread terminates + // Await index monitor before the main thread terminates new FlintSpark(spark).flintIndexMonitor.awaitMonitor() } else { logInfo(s"""