Skip to content

Commit

Permalink
Resolve Forever Waiting Issue in FlintJob for Streaming Jobs
Browse files Browse the repository at this point in the history
This commit specifically tackles the issue detailed in opensearch-project#116. Previously, regardless of whether a streaming job was successfully initiated or encountered syntax/semantic errors during creation, the main thread would indefinitely wait for the streaming job's termination. This behavior often led to situations where the main thread would hang indefinitely, waiting for a streaming job that had not even started due to errors.

This update introduces a change: spark.streams.awaitAnyTermination() is now invoked only if the streaming query is executed without encountering any exceptions.

Testing Performed:
* Replicated the original issue to confirm its presence.
* Applied the fix and verified that the issue of indefinite waiting has been successfully resolved.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Nov 14, 2023
1 parent 6a2f62b commit 96835bc
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ object FlintJob extends Logging with FlintJobExecutor {
// osClient needs spark session to be created first to get FlintOptions initialized.
// Otherwise, we will have connection exception from EMR-S to OS.
val osClient = new OSClient(FlintSparkConf().flintOptions())
var exceptionThrown = true
try {
val futureMappingCheck = Future {
checkAndCreateIndex(osClient, resultIndex)
Expand All @@ -70,6 +71,7 @@ object FlintJob extends Logging with FlintJobExecutor {
case Left(error) =>
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider)
})
exceptionThrown = false
} catch {
case e: TimeoutException =>
val error = s"Getting the mapping of index $resultIndex timed out"
Expand All @@ -82,8 +84,9 @@ object FlintJob extends Logging with FlintJobExecutor {
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
} finally {
dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient))
// Stop SparkSession if it is not streaming job
if (wait.equalsIgnoreCase("streaming")) {
// Stop SparkSession if streaming job succeeds
if (!exceptionThrown && wait.equalsIgnoreCase("streaming")) {
// wait if any child thread to finish before the main thread terminates
spark.streams.awaitAnyTermination()
} else {
spark.stop()
Expand Down

0 comments on commit 96835bc

Please sign in to comment.