From d2df0e5e498c54a97daaa49bd2f95c2310a592c0 Mon Sep 17 00:00:00 2001 From: saLeox <282130830@qq.com> Date: Wed, 2 Nov 2022 17:54:03 +0800 Subject: [PATCH] Enable customized and isolated python environment for Pyspark --- linkis-dist/package/db/linkis_dml.sql | 4 ++ .../spark/config/SparkConfiguration.scala | 3 -- .../spark/executor/SparkPythonExecutor.scala | 49 ++++++++++--------- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/linkis-dist/package/db/linkis_dml.sql b/linkis-dist/package/db/linkis_dml.sql index 7412acedad..0ae55fabf6 100644 --- a/linkis-dist/package/db/linkis_dml.sql +++ b/linkis-dist/package/db/linkis_dml.sql @@ -77,6 +77,10 @@ INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.port', NULL, NULL, '4000', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.user', NULL, NULL, 'root', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.python.version', '取值范围:python2,python3', 'python版本','python2', 'OFT', '[\"python3\",\"python2\"]', '0', '0', '1', 'spark引擎设置', 'spark'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.yarn.dist.archives', 'Comma separated list of archives to be extracted into the working directory of each executor.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.pyspark.python', 'Python binary executable to use for PySpark in both driver and executors.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.pyspark.driver.python', 'Python binary executable to use for PySpark in driver.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark'); + -- hive INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'hive引擎最大并发数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'hive'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', '取值范围:1-10,单位:G', 'hive引擎初始化内存大小','1g', 'Regex', '^([1-9]|10)(G|g)$', '0', '0', '1', 'hive引擎设置', 'hive'); diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index 20cc9006aa..52fb02cd54 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -108,9 +108,6 @@ object SparkConfiguration extends Logging { val ENGINE_SHUTDOWN_LOGS = CommonVars("wds.linkis.spark.engineconn.fatal.log", "error writing class;OutOfMemoryError") - val PYSPARK_PYTHON3_PATH = - CommonVars[String]("pyspark.python3.path", "/appcom/Install/anaconda3/bin/python") - val ENABLE_REPLACE_PACKAGE_NAME = CommonVars("wds.linkis.spark.engine.scala.replace_package_header.enable", true) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala index c1b1b96cf0..124f6d32f2 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala @@ -145,18 +145,31 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In .toString .toLowerCase() val sparkPythonVersion = - if ( - StringUtils - .isNotBlank(userDefinePythonVersion) && userDefinePythonVersion.equals("python3") - ) { - SparkConfiguration.PYSPARK_PYTHON3_PATH.getValue - } else { - userDefinePythonVersion - } - val pythonExec = CommonVars("PYSPARK_DRIVER_PYTHON", sparkPythonVersion).getValue + if (StringUtils.isNotBlank(userDefinePythonVersion)) userDefinePythonVersion else "python" + val pySparkDriverPythonFromVersion = + if (new File(sparkPythonVersion).exists()) sparkPythonVersion else "" + + // extra pyspark driver Python + val pySparkDriverPythonConf = "spark.pyspark.driver.python" + val userDefinePySparkDriverPython = + sc.getConf.getOption(pySparkDriverPythonConf).getOrElse(pySparkDriverPythonFromVersion) + val defaultPySparkDriverPython = CommonVars("PYSPARK_DRIVER_PYTHON", "").getValue + // spark.pyspark.driver.python > spark.python.version > PYSPARK_DRIVER_PYTHON + val pySparkDriverPython = + if (StringUtils.isNotBlank(userDefinePySparkDriverPython)) userDefinePySparkDriverPython + else defaultPySparkDriverPython + logger.info(s"PYSPARK_DRIVER_PYTHON => $pySparkDriverPython") + + // extra pyspark Python + val pySparkPythonConf = "spark.pyspark.python" + val userDefinePySparkPython = sc.getConf.getOption(pySparkPythonConf).getOrElse("") + val defaultPySparkPython = CommonVars("PYSPARK_PYTHON", "").getValue + val pySparkPython = + if (StringUtils.isNotBlank(userDefinePySparkPython)) userDefinePySparkPython + else defaultPySparkPython + logger.info(s"PYSPARK_PYTHON => $pySparkPython") val pythonScriptPath = CommonVars("python.script.path", "python/mix_pyspark.py").getValue - val port: Int = EngineUtils.findAvailPort gatewayServer = gwBuilder.entryPoint(this).javaPort(port).build() gatewayServer.start() @@ -168,6 +181,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In ) val pythonClasspath = new StringBuilder(pythonPath) + // extra spark files val files = sc.getConf.get("spark.files", "") logger.info(s"output spark files ${files}") if (StringUtils.isNotEmpty(files)) { @@ -186,7 +200,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In .filter(_.endsWith(".zip")) .foreach(pythonClasspath ++= File.pathSeparator ++= _) - val cmd = CommandLine.parse(pythonExec) + val cmd = CommandLine.parse(pySparkDriverPython) cmd.addArgument(createFakeShell(pythonScriptPath).getAbsolutePath, false) cmd.addArgument(port.toString, false) cmd.addArgument(EngineUtils.sparkSubmitVersion().replaceAll("\\.", ""), false) @@ -195,19 +209,8 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In cmd.addArgument(pyFiles, false) val builder = new ProcessBuilder(cmd.toStrings.toSeq.toList.asJava) - val env = builder.environment() - if (StringUtils.isBlank(sc.getConf.get("spark.pyspark.python", ""))) { - logger.info("spark.pyspark.python is null") - if (userDefinePythonVersion.equals("python3")) { - logger.info(s"userDefinePythonVersion is $pythonExec will be set to PYSPARK_PYTHON") - env.put("PYSPARK_PYTHON", pythonExec) - } - } else { - val executorPython = sc.getConf.get("spark.pyspark.python") - logger.info(s"set PYSPARK_PYTHON spark.pyspark.python is $executorPython") - env.put("PYSPARK_PYTHON", executorPython) - } + if (StringUtils.isNotBlank(pySparkPython)) env.put("PYSPARK_PYTHON", pySparkPython) env.put("PYTHONPATH", pythonClasspath.toString()) env.put("PYTHONUNBUFFERED", "YES") env.put("PYSPARK_GATEWAY_PORT", "" + port)