Skip to content

Commit

Permalink
Resolve Forever Waiting Issue in FlintJob for Streaming Jobs (#156)
Browse files Browse the repository at this point in the history
This commit specifically tackles the issue detailed in #116. Previously, regardless of whether a streaming job was successfully initiated or encountered syntax/semantic errors during creation, the main thread would indefinitely wait for the streaming job's termination. This behavior often led to situations where the main thread would hang indefinitely, waiting for a streaming job that had not even started due to errors.

This update introduces a change: spark.streams.awaitAnyTermination() is now invoked only if the streaming query is executed without encountering any exceptions.

Testing Performed:
* Replicated the original issue to confirm its presence.
* Applied the fix and verified that the issue of indefinite waiting has been successfully resolved.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Nov 15, 2023
1 parent 6a2f62b commit 24252d1
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ object FlintJob extends Logging with FlintJobExecutor {
// osClient needs spark session to be created first to get FlintOptions initialized.
// Otherwise, we will have connection exception from EMR-S to OS.
val osClient = new OSClient(FlintSparkConf().flintOptions())
var exceptionThrown = true
try {
val futureMappingCheck = Future {
checkAndCreateIndex(osClient, resultIndex)
Expand All @@ -70,6 +71,7 @@ object FlintJob extends Logging with FlintJobExecutor {
case Left(error) =>
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider)
})
exceptionThrown = false
} catch {
case e: TimeoutException =>
val error = s"Getting the mapping of index $resultIndex timed out"
Expand All @@ -82,8 +84,9 @@ object FlintJob extends Logging with FlintJobExecutor {
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
} finally {
dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient))
// Stop SparkSession if it is not streaming job
if (wait.equalsIgnoreCase("streaming")) {
// Stop SparkSession if streaming job succeeds
if (!exceptionThrown && wait.equalsIgnoreCase("streaming")) {
// wait if any child thread to finish before the main thread terminates
spark.streams.awaitAnyTermination()
} else {
spark.stop()
Expand Down

0 comments on commit 24252d1

Please sign in to comment.