Skip to content

Commit

Permalink
Enhance Logging for Streaming Job Cleanup and Reduce REPL Inactivity …
Browse files Browse the repository at this point in the history
…Timeout

- Added detailed logging to improve visibility during streaming job cleanup.
- Decreased REPL job inactivity timeout from 30 to 10 minutes..

Tested manually to ensure new logs are correctly displayed during streaming job cleanup.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Nov 16, 2023
1 parent 0351f40 commit f0e0135
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.apache.spark.sql

import java.util.Locale
import java.util.concurrent.ThreadPoolExecutor

import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.concurrent.duration.{Duration, MINUTES}
Expand Down Expand Up @@ -83,15 +84,41 @@ object FlintJob extends Logging with FlintJobExecutor {
dataToWrite = Some(
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
} finally {
cleanUpResources(
spark,
exceptionThrown,
wait,
threadPool,
dataToWrite,
resultIndex,
osClient)
}
}

def cleanUpResources(
spark: SparkSession,
exceptionThrown: Boolean,
wait: String,
threadPool: ThreadPoolExecutor,
dataToWrite: Option[DataFrame],
resultIndex: String,
osClient: OSClient): Unit = {
try {
dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient))
} catch {
case e: Exception => logError("fail to write to result index", e)
}
try {
// Stop SparkSession if streaming job succeeds
if (!exceptionThrown && wait.equalsIgnoreCase("streaming")) {
// wait if any child thread to finish before the main thread terminates
spark.streams.awaitAnyTermination()
} else {
spark.stop()
}

} catch {
case e: Exception => logError("fail to close spark session", e)
} finally {
threadPool.shutdown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.util.ThreadUtils
object FlintREPL extends Logging with FlintJobExecutor {

private val HEARTBEAT_INTERVAL_MILLIS = 60000L
private val DEFAULT_INACTIVITY_LIMIT_MILLIS = 30 * 60 * 1000
private val DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000
private val MAPPING_CHECK_TIMEOUT = Duration(1, MINUTES)
private val DEFAULT_QUERY_EXECUTION_TIMEOUT = Duration(10, MINUTES)
private val DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS = 10 * 60 * 1000
Expand Down

0 comments on commit f0e0135

Please sign in to comment.