Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 7, 2024
1 parent 5c6a51e commit 2ac641a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ class FlintSparkIndexMonitor(
} else {
logInfo(s"Index monitor for [$indexName] not found.")

// Streaming job may exit early. Try to find Flint index name in monitor list.
/*
* Streaming job exits early. Try to find Flint index name in monitor list.
* Assuming: 1) there are at most 1 entry in the list, otherwise index name
* must be given upon this method call; 2) this await API must be called for
* auto refresh index, otherwise index state will be updated mistakenly.
*/
val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption
if (name.isDefined) {
logInfo(s"Found index name in index monitor task list: ${name.get}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ case class JobOperator(
}

try {
// Wait for streaming job complete if no error and there is streaming job running
// Wait for streaming job complete if no error
if (!exceptionThrown && streaming) {
// Clean Spark shuffle data after each microBatch.
spark.streams.addListener(new ShuffleCleaner(spark))
// Await streaming job thread to finish before the main thread terminates
// Await index monitor before the main thread terminates
new FlintSpark(spark).flintIndexMonitor.awaitMonitor()
} else {
logInfo(s"""
Expand Down

0 comments on commit 2ac641a

Please sign in to comment.