Skip to content

Commit

Permalink
Python supports killing subprocesses
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Oct 16, 2023
1 parent 922df46 commit 247616a
Showing 1 changed file with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.linkis.engineplugin.spark.exception.ExecuteError
import org.apache.linkis.engineplugin.spark.imexport.CsvRelation
import org.apache.linkis.engineplugin.spark.utils.EngineUtils
import org.apache.linkis.governance.common.paser.PythonCodeParser
import org.apache.linkis.governance.common.utils.GovernanceUtils
import org.apache.linkis.scheduler.executer.{ExecuteResponse, SuccessExecuteResponse}
import org.apache.linkis.storage.resultset.ResultSetWriterFactory

Expand Down Expand Up @@ -120,16 +121,15 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}
IOUtils.closeQuietly(lineOutputStream)
Utils.tryAndErrorMsg {
process.destroy()
process = null
Thread.sleep(1000 * 2L)
// process.destroy will kills the subprocess,not need to force kill with -9,
// kill -9 may cause resources not to be released
// invoke kill process method to kill all tree process
pid.foreach(p => {
logger.info(s"Try to kill pyspark process with: [kill -15 ${p}]")
Utils.exec(Array("kill", "-15", p), 3000L)
GovernanceUtils.killProcess(String.valueOf(p), s"kill pyspark process,pid: $pid", false)
})

if (pid.isEmpty) {
process.destroy()
process = null
}
}("process close failed")
}
logger.info(s"To delete python executor")
Expand Down Expand Up @@ -253,9 +253,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
// close
Utils.tryFinally({
if (promise != null && !promise.isCompleted) {
promise.failure(
new ExecuteError(PYSPARK_STOPPED.getErrorCode, PYSPARK_STOPPED.getErrorDesc)
)
promise.failure(ExecuteError(PYSPARK_STOPPED.getErrorCode, PYSPARK_STOPPED.getErrorDesc))
}
}) {
close
Expand Down Expand Up @@ -289,11 +287,9 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
if (process == null) {
Utils.tryThrow(initGateway) { t =>
{
val errMsg =
s"initialize python executor failed, please ask administrator for help! errMsg: ${t.getMessage}"
logger.error(errMsg, t)
logger.error("initialize python executor failed, please ask administrator for help!", t)
Utils.tryAndWarn(close)
throw new IllegalStateException(errMsg, t)
throw t
}
}
}
Expand Down

0 comments on commit 247616a

Please sign in to comment.