From 2e808dec5aa749bfed65eb651270c9049d0c294c Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 4 Sep 2023 15:30:11 +0800 Subject: [PATCH 1/3] Spark once task supports engingeConnRuntimeMode label --- .../factory/SparkEngineConnFactory.scala | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index bc18e2badf..c882eaf93a 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -40,6 +40,7 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable import org.apache.linkis.manager.label.constant.LabelValueConstant +import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType import org.apache.linkis.manager.label.utils.LabelUtil @@ -86,12 +87,22 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options) val sparkHome = SPARK_HOME.getValue(options) val sparkConfDir = SPARK_CONF_DIR.getValue(options) - val sparkConfig: SparkConfig = getSparkConfig(options) + val sparkConfig: SparkConfig = + getSparkConfig(options, isYarnClusterMode(engineCreationContext.getLabels())) val context = new EnvironmentContext(sparkConfig, hadoopConfDir, sparkConfDir, sparkHome, null) context } - def getSparkConfig(options: util.Map[String, String]): SparkConfig = { + def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = { + val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels) + val isYarnClusterMode: Boolean = { + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + } + isYarnClusterMode + } + + def getSparkConfig(options: util.Map[String, String], isYarnClusterMode: Boolean): SparkConfig = { logger.info("options: " + JsonUtils.jackson.writeValueAsString(options)) val sparkConfig: SparkConfig = new SparkConfig() sparkConfig.setJavaHome(variable(Environment.JAVA_HOME)) @@ -112,7 +123,14 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options)) sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options)) } - sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options)) + + if (master.startsWith("yarn")) { + if (isYarnClusterMode) { + sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLUSTER) + } else { + sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLIENT) + } + } sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options)) sparkConfig.setAppName(SPARK_APP_NAME.getValue(options)) sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo From fd34629dc77ead85c5ca174d8678678d8cc75eeb Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 5 Sep 2023 10:33:20 +0800 Subject: [PATCH 2/3] isYarnClusterMode extracts to LabelUtil --- .../linkis/manager/label/utils/LabelUtil.scala | 10 ++++++++++ .../spark/factory/SparkEngineConnFactory.scala | 18 ++---------------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala index 3965a5ea11..986f130686 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala @@ -17,6 +17,7 @@ package org.apache.linkis.manager.label.utils +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{ CodeLanguageLabel, @@ -135,4 +136,13 @@ object LabelUtil { null.asInstanceOf[A] } + def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = { + val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels) + val isYarnClusterMode: Boolean = { + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + } + isYarnClusterMode + } + } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index c882eaf93a..0a5820ca0c 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -39,8 +39,6 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ } import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable -import org.apache.linkis.manager.label.constant.LabelValueConstant -import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType import org.apache.linkis.manager.label.utils.LabelUtil @@ -88,20 +86,11 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val sparkHome = SPARK_HOME.getValue(options) val sparkConfDir = SPARK_CONF_DIR.getValue(options) val sparkConfig: SparkConfig = - getSparkConfig(options, isYarnClusterMode(engineCreationContext.getLabels())) + getSparkConfig(options, LabelUtil.isYarnClusterMode(engineCreationContext.getLabels())) val context = new EnvironmentContext(sparkConfig, hadoopConfDir, sparkConfDir, sparkHome, null) context } - def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = { - val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels) - val isYarnClusterMode: Boolean = { - if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true - else false - } - isYarnClusterMode - } - def getSparkConfig(options: util.Map[String, String], isYarnClusterMode: Boolean): SparkConfig = { logger.info("options: " + JsonUtils.jackson.writeValueAsString(options)) val sparkConfig: SparkConfig = new SparkConfig() @@ -165,10 +154,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue) logger.info(s"------ Create new SparkContext {$master} -------") - val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels()) - val isYarnClusterMode: Boolean = - if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true - else false + val isYarnClusterMode: Boolean = LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()) if (isYarnClusterMode) { sparkConf.set("spark.submit.deployMode", "cluster") From 1771c5ad38e1d6b946c5e1de2afe19f72f7610b0 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 15 Sep 2023 13:23:53 +0800 Subject: [PATCH 3/3] Modify SparkEngineConnFactory --- .../engineplugin/spark/factory/SparkEngineConnFactory.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index 0a5820ca0c..5c14502c8a 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -154,7 +154,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue) logger.info(s"------ Create new SparkContext {$master} -------") - val isYarnClusterMode: Boolean = LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()) + val isYarnClusterMode = LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()) if (isYarnClusterMode) { sparkConf.set("spark.submit.deployMode", "cluster")