From 8f149e1800a57be2d18fb8e0592191bc3aa8ba69 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 18 Sep 2023 15:09:04 +0800 Subject: [PATCH] Spark once task supports engingeConnRuntimeMode label (#4896) * Spark once task supports engingeConnRuntimeMode label * isYarnClusterMode extracts to LabelUtil * Modify SparkEngineConnFactory --- .../manager/label/utils/LabelUtil.scala | 10 ++++++++++ .../factory/SparkEngineConnFactory.scala | 20 +++++++++++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala index 3965a5ea11..986f130686 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala @@ -17,6 +17,7 @@ package org.apache.linkis.manager.label.utils +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{ CodeLanguageLabel, @@ -135,4 +136,13 @@ object LabelUtil { null.asInstanceOf[A] } + def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = { + val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels) + val isYarnClusterMode: Boolean = { + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + } + isYarnClusterMode + } + } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index e8f2cd22d3..fbd38bcc68 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -39,7 +39,6 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ } import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable -import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType import org.apache.linkis.manager.label.utils.LabelUtil @@ -86,12 +85,13 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options) val sparkHome = SPARK_HOME.getValue(options) val sparkConfDir = SPARK_CONF_DIR.getValue(options) - val sparkConfig: SparkConfig = getSparkConfig(options) + val sparkConfig: SparkConfig = + getSparkConfig(options, LabelUtil.isYarnClusterMode(engineCreationContext.getLabels())) val context = new EnvironmentContext(sparkConfig, hadoopConfDir, sparkConfDir, sparkHome, null) context } - def getSparkConfig(options: util.Map[String, String]): SparkConfig = { + def getSparkConfig(options: util.Map[String, String], isYarnClusterMode: Boolean): SparkConfig = { logger.info("options: " + JsonUtils.jackson.writeValueAsString(options)) val sparkConfig: SparkConfig = new SparkConfig() sparkConfig.setJavaHome(variable(Environment.JAVA_HOME)) @@ -114,7 +114,14 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options)) sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options)) } - sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options)) + + if (master.startsWith("yarn")) { + if (isYarnClusterMode) { + sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLUSTER) + } else { + sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLIENT) + } + } sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options)) sparkConfig.setAppName(SPARK_APP_NAME.getValue(options)) sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo @@ -149,10 +156,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue) logger.info(s"------ Create new SparkContext {$master} -------") - val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels()) - val isYarnClusterMode: Boolean = - if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true - else false + val isYarnClusterMode = LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()) if (isYarnClusterMode) { sparkConf.set("spark.submit.deployMode", "cluster")