Skip to content

Commit

Permalink
Fix shutdown bug due to non-daemon thread in driver
Browse files Browse the repository at this point in the history
Similar to #175, this PR adds shutdown logic in FlintJob.

Tests:
* Verified in IT if terminateJVM is enabled, JVM would shut down.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Mar 20, 2024
1 parent 029f843 commit 82bc158
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
JobOperator(spark, query, dataSourceName, resultIndex, true, streamingRunningCount)
job.envinromentProvider = new MockEnvironment(
Map("SERVERLESS_EMR_JOB_ID" -> jobRunId, "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" -> appId))

job.terminateJVM = false
job.start()
}
futureResult.onComplete {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ trait FlintJobExecutor {
var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory()
var envinromentProvider: EnvironmentProvider = new RealEnvironment()
var enableHiveSupport: Boolean = true
// termiante JVM in the presence non-deamon thread before exiting
var terminateJVM = true

// The enabled setting, which can be applied only to the top-level mapping definition and to object fields,
val resultIndexMapping =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ object FlintREPL extends Logging with FlintJobExecutor {
val EARLY_TERMIANTION_CHECK_FREQUENCY = 60000L

@volatile var earlyExitFlag: Boolean = false
// termiante JVM in the presence non-deamon thread before exiting
var terminateJVM = true

def updateSessionIndex(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = {
updater.update(flintCommand.statementId, FlintCommand.serialize(flintCommand))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.flint.core.storage.OpenSearchUpdater
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.FlintJob.createSparkSession
import org.apache.spark.sql.FlintREPL.{executeQuery, logInfo, updateFlintInstanceBeforeShutdown}
import org.apache.spark.sql.FlintREPL.{executeQuery, logInfo, threadPoolFactory, updateFlintInstanceBeforeShutdown}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -106,6 +106,18 @@ case class JobOperator(
case e: Exception => logError("Fail to close threadpool", e)
}
recordStreamingCompletionStatus(exceptionThrown)

// Check for non-daemon threads that may prevent the driver from shutting down.
// Non-daemon threads other than the main thread indicate that the driver is still processing tasks,
// which may be due to unresolved bugs in dependencies or threads not being properly shut down.
if (terminateJVM && threadPoolFactory.hasNonDaemonThreadsOtherThanMain) {
logInfo("A non-daemon thread in the driver is seen.")
// Exit the JVM to prevent resource leaks and potential emr-s job hung.
// A zero status code is used for a graceful shutdown without indicating an error.
// If exiting with non-zero status, emr-s job will fail.
// This is a part of the fault tolerance mechanism to handle such scenarios gracefully
System.exit(0)
}
}

def stop(): Unit = {
Expand Down

0 comments on commit 82bc158

Please sign in to comment.