diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 9492cc6d9..cb1f5c1ca 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -58,6 +58,7 @@ object FlintJob extends Logging with FlintJobExecutor { // osClient needs spark session to be created first to get FlintOptions initialized. // Otherwise, we will have connection exception from EMR-S to OS. val osClient = new OSClient(FlintSparkConf().flintOptions()) + var exceptionThrown = true try { val futureMappingCheck = Future { checkAndCreateIndex(osClient, resultIndex) @@ -70,6 +71,7 @@ object FlintJob extends Logging with FlintJobExecutor { case Left(error) => getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider) }) + exceptionThrown = false } catch { case e: TimeoutException => val error = s"Getting the mapping of index $resultIndex timed out" @@ -82,8 +84,9 @@ object FlintJob extends Logging with FlintJobExecutor { getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider)) } finally { dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient)) - // Stop SparkSession if it is not streaming job - if (wait.equalsIgnoreCase("streaming")) { + // Stop SparkSession if streaming job succeeds + if (!exceptionThrown && wait.equalsIgnoreCase("streaming")) { + // wait if any child thread to finish before the main thread terminates spark.streams.awaitAnyTermination() } else { spark.stop()