From 3d4f204337db8493956bc50deb9a8b68799bad10 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 4 Aug 2023 10:47:27 +0800 Subject: [PATCH 01/12] spark support yarn cluster --- .../spark/factory/SparkEngineConnFactory.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index 5bf90c6bfe..38682a48ea 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -162,6 +162,15 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging // when spark version is greater than or equal to 1.5.0 if (master.contains("yarn")) sparkConf.set("spark.yarn.isPython", "true") + // Set deploy-mode with the optional values `cluster`and `client`, the default value `client` + val deployMode: String = SPARK_DEPLOY_MODE.getValue(options) + if ( + StringUtils + .isNotBlank(deployMode) && (deployMode.equals("cluster") || deployMode.equals("client")) + ) { + sparkConf.set("spark.submit.deployMode", deployMode) + } + val outputDir = createOutputDir(sparkConf) logger.info( From 5723be62e4b2db86e2a3ad545a34304b8433a606 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 4 Aug 2023 14:30:32 +0800 Subject: [PATCH 02/12] spark support yarn cluster --- .../launch/SparkSubmitProcessEngineConnLaunchBuilder.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 8472cdfa91..49acd1beff 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -22,6 +22,7 @@ import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ ENGINE_JAR, SPARK_APP_NAME, SPARK_DEFAULT_EXTERNAL_JARS_PATH, + SPARK_DEPLOY_MODE, SPARK_DRIVER_CLASSPATH, SPARK_DRIVER_EXTRA_JAVA_OPTIONS, SPARK_PYTHON_VERSION, @@ -115,8 +116,10 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa memory } + val deployMode: String = getValueAndRemove(properties, SPARK_DEPLOY_MODE) + addOpt("--master", "yarn") - addOpt("--deploy-mode", "client") + addOpt("--deploy-mode", deployMode) addOpt("--name", appName) addProxyUser() From 0bd1796432f7f9be02b0349bc02bbda5dd3a273f Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 8 Aug 2023 11:36:56 +0800 Subject: [PATCH 03/12] spark support yarn cluster --- .../errorcode/SparkErrorCodeSummary.java | 4 ++ .../spark/config/SparkConfiguration.scala | 7 ++++ .../factory/SparkEngineConnFactory.scala | 41 +++++++++++-------- ...SubmitProcessEngineConnLaunchBuilder.scala | 28 ++++++++++++- 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java index 936e773e40..42f0b66e4d 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java @@ -66,6 +66,10 @@ public enum SparkErrorCodeSummary implements LinkisErrorCode { 43032, "The application start failed, since yarn applicationId is null."), NOT_SUPPORT_METHOD(43040, "Not support method for requestExpectedResource."), + + LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR( + 43042, + "linkis.spark.yarn.cluster.jars parameters configuration errors(linkis.spark.yarn.cluster.jars 参数配置错误)."), ; /** (errorCode)错误码 */ diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index ecc37597db..84897e172a 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -30,6 +30,10 @@ object SparkConfiguration extends Logging { val SPARK_HOME_ENV = "SPARK_HOME" val SPARK_CONF_DIR_ENV = "SPARK_CONF_DIR" + val SPARK_YARN_CLIENT = "client" + + val SPARK_YARN_CLUSTER = "cluster" + val PROCESS_MAX_THREADS = CommonVars[Int]("wds.linkis.process.threadpool.max", 100) val SPARK_SESSION_HOOK = CommonVars[String]("wds.linkis.engine.spark.session.hook", "") @@ -46,6 +50,9 @@ object SparkConfiguration extends Logging { val SPARK_DEPLOY_MODE = CommonVars[String]("spark.submit.deployMode", "client") + val SPARK_YARN_CLUSTER_JARS = + CommonVars[String]("linkis.spark.yarn.cluster.jars", "hdfs:///spark/cluster") + val SPARK_APP_NAME = CommonVars[String]("spark.app.name", "Linkis-EngineConn-Spark") val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "") val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "") diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index 38682a48ea..5afe696e71 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -144,33 +144,38 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val master = sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue) logger.info(s"------ Create new SparkContext {$master} -------") - val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue - val pysparkPath = new File(pysparkBasePath, "python" + File.separator + "lib") - var pythonLibUris = pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip")) - if (pythonLibUris.length == 2) { - val sparkConfValue1 = Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue) - val sparkConfValue2 = Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files")) - if (StringUtils.isNotBlank(sparkConfValue2)) { - pythonLibUris = sparkConfValue2 +: pythonLibUris - } - if (StringUtils.isNotBlank(sparkConfValue1)) { - pythonLibUris = sparkConfValue1 +: pythonLibUris - } - sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(",")) - } - // Distributes needed libraries to workers - // when spark version is greater than or equal to 1.5.0 - if (master.contains("yarn")) sparkConf.set("spark.yarn.isPython", "true") // Set deploy-mode with the optional values `cluster`and `client`, the default value `client` val deployMode: String = SPARK_DEPLOY_MODE.getValue(options) if ( StringUtils - .isNotBlank(deployMode) && (deployMode.equals("cluster") || deployMode.equals("client")) + .isNotBlank(deployMode) && (deployMode + .equals(SPARK_YARN_CLUSTER) || deployMode.equals(SPARK_YARN_CLIENT)) ) { sparkConf.set("spark.submit.deployMode", deployMode) } + // todo yarn cluster暂时不支持pyspark,后期对pyspark进行处理 + if (StringUtils.isNotBlank(deployMode) && deployMode.equals(SPARK_YARN_CLIENT)) { + val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue + val pysparkPath = new File(pysparkBasePath, "python" + File.separator + "lib") + var pythonLibUris = pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip")) + if (pythonLibUris.length == 2) { + val sparkConfValue1 = Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue) + val sparkConfValue2 = Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files")) + if (StringUtils.isNotBlank(sparkConfValue2)) { + pythonLibUris = sparkConfValue2 +: pythonLibUris + } + if (StringUtils.isNotBlank(sparkConfValue1)) { + pythonLibUris = sparkConfValue1 +: pythonLibUris + } + sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(",")) + } + } + // Distributes needed libraries to workers + // when spark version is greater than or equal to 1.5.0 + if (master.contains("yarn")) sparkConf.set("spark.yarn.isPython", "true") + val outputDir = createOutputDir(sparkConf) logger.info( diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 49acd1beff..82faf651b4 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -18,6 +18,7 @@ package org.apache.linkis.engineplugin.spark.launch import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.engineplugin.spark.config.SparkConfiguration import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ ENGINE_JAR, SPARK_APP_NAME, @@ -26,9 +27,12 @@ import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ SPARK_DRIVER_CLASSPATH, SPARK_DRIVER_EXTRA_JAVA_OPTIONS, SPARK_PYTHON_VERSION, - SPARK_SUBMIT_PATH + SPARK_SUBMIT_PATH, + SPARK_YARN_CLUSTER_JARS } import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._ +import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary +import org.apache.linkis.engineplugin.spark.exception.SparkEngineException import org.apache.linkis.hadoop.common.conf.HadoopConf import org.apache.linkis.manager.common.entity.resource.DriverAndYarnResource import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration @@ -62,7 +66,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY) val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES) - val files = getValueAndRemove(properties, "files", "").split(",").filter(isNotBlankPath) + var files = getValueAndRemove(properties, "files", "").split(",").filter(isNotBlankPath) val jars = new ArrayBuffer[String]() jars ++= getValueAndRemove(properties, "jars", "").split(",").filter(isNotBlankPath) jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH) @@ -118,6 +122,26 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val deployMode: String = getValueAndRemove(properties, SPARK_DEPLOY_MODE) + // {pwd}/conf/linkis-engineconn.properties + if (deployMode.equals(SparkConfiguration.SPARK_YARN_CLUSTER)) { +// files ++ (s"${variable(PWD)}/conf/linkis-engineconn.properties") + files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties") + + var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS) + + if (StringUtils.isNotBlank(clusterJars)) { + throw new SparkEngineException( + SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorCode, + SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorDesc + ) + } + + if (clusterJars.endsWith("/")) { + clusterJars = clusterJars.dropRight(1) + } + jars += s"$clusterJars/*" + } + addOpt("--master", "yarn") addOpt("--deploy-mode", deployMode) addOpt("--name", appName) From 8a70c34fc5c4483a7b7baa366fc3ce751e91735c Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 8 Aug 2023 15:11:17 +0800 Subject: [PATCH 04/12] spark support yarn cluster --- .../launch/SparkSubmitProcessEngineConnLaunchBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 82faf651b4..486a22a0de 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -129,7 +129,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS) - if (StringUtils.isNotBlank(clusterJars)) { + if (StringUtils.isBlank(clusterJars)) { throw new SparkEngineException( SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorCode, SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorDesc From cf8740d7ce3f1eb24f5de0574111c4cb4c9fd9f8 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 8 Aug 2023 16:05:52 +0800 Subject: [PATCH 05/12] spark support yarn cluster --- ...kSubmitProcessEngineConnLaunchBuilder.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 486a22a0de..d9096d1051 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -122,9 +122,10 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val deployMode: String = getValueAndRemove(properties, SPARK_DEPLOY_MODE) - // {pwd}/conf/linkis-engineconn.properties - if (deployMode.equals(SparkConfiguration.SPARK_YARN_CLUSTER)) { -// files ++ (s"${variable(PWD)}/conf/linkis-engineconn.properties") + val isYarnCluster: Boolean = + if (deployMode.equals(SparkConfiguration.SPARK_YARN_CLUSTER)) true else false + + if (isYarnCluster) { files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties") var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS) @@ -164,7 +165,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa addOpt("--num-executors", numExecutors.toString) addOpt("--queue", queue) - getConf(engineConnBuildRequest, gcLogDir, logDir).foreach { case (key, value) => + getConf(engineConnBuildRequest, gcLogDir, logDir, isYarnCluster).foreach { case (key, value) => addOpt("--conf", s"""$key="$value"""") } @@ -179,7 +180,8 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa def getConf( engineConnBuildRequest: EngineConnBuildRequest, gcLogDir: String, - logDir: String + logDir: String, + isYarnCluster: Boolean ): ArrayBuffer[(String, String)] = { val driverJavaSet = new StringBuilder(" -server") if (StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue)) { @@ -195,7 +197,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa .foreach(l => { driverJavaSet.append(" ").append(l) }) - driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS)) + if (isYarnCluster) { + driverJavaSet.append(" -Djava.io.tmpdir=/tmp") + } else { + driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS)) + } if (EnvConfiguration.ENGINE_CONN_DEBUG_ENABLE.getValue) { driverJavaSet.append( s" -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${variable(RANDOM_PORT)}" From ec674646ca35b2398748cdf34621b0040cdb813d Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 10 Aug 2023 19:34:43 +0800 Subject: [PATCH 06/12] LinkisManagerApplication Remove useless code --- .../impl/DefaultEngineConnKillService.java | 7 ++ .../AbstractEngineConnLaunchService.scala | 12 +++- .../hook/CallbackEngineConnHook.scala | 4 +- .../service/EngineConnPidCallback.scala | 16 ++++- .../engine/DefaultEngineCreateService.java | 5 ++ .../DefaultEngineConnPidCallbackService.java | 16 ++++- .../label/constant/LabelKeyConstant.java | 2 + .../entity/engine/YarnClusterModeLabel.java | 70 +++++++++++++++++++ .../manager/label/utils/LabelUtil.scala | 7 +- .../manager/common/constant/AMConstant.java | 2 + .../executor/SparkEngineConnExecutor.scala | 14 +++- ...SubmitProcessEngineConnLaunchBuilder.scala | 4 +- 12 files changed, 148 insertions(+), 11 deletions(-) create mode 100644 linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/YarnClusterModeLabel.java diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java index 440208cd62..a6a932a578 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java @@ -94,6 +94,13 @@ public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest killYarnAppIdOfOneEc(engineStopRequest); } + if (AMConstant.CLUSTER_PROCESS_MARK.equals(engineStopRequest.getIdentifierType()) + && engineStopRequest.getIdentifier() != null) { + List appIds = new ArrayList<>(); + appIds.add(engineStopRequest.getIdentifier()); + GovernanceUtils.killYarnJobApp(appIds); + } + if (!response.getStopStatus()) { EngineSuicideRequest request = new EngineSuicideRequest( diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala index f088f9fcdc..f7258f4e4b 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala @@ -38,9 +38,11 @@ import org.apache.linkis.manager.common.protocol.engine.{ EngineStopRequest } import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest +import org.apache.linkis.manager.label.entity.engine.YarnClusterModeLabel import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender +import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import scala.concurrent.ExecutionContextExecutorService @@ -146,11 +148,19 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w throw t } LoggerUtils.removeJobIdMDC() + val deployMode: String = + request.creationDesc.properties.getOrDefault("spark.submit.deployMode", "client") + val engineNode = new AMEngineNode() engineNode.setLabels(conn.getLabels) engineNode.setServiceInstance(conn.getServiceInstance) engineNode.setOwner(request.user) - engineNode.setMark(AMConstant.PROCESS_MARK) + if (StringUtils.isNotBlank(deployMode) && deployMode.equals("cluster")) { + engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK) + engineNode.getLabels.add(new YarnClusterModeLabel()) + } else { + engineNode.setMark(AMConstant.PROCESS_MARK) + } engineNode } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala index 1f4c5cec73..63703dd5b9 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala @@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor import org.apache.linkis.engineconn.callback.service.{ EngineConnAfterStartCallback, - EngineConnPidCallback + EngineConnIdentifierCallback } import org.apache.linkis.engineconn.common.conf.EngineConnConf import org.apache.linkis.engineconn.common.creation.EngineCreationContext @@ -61,7 +61,7 @@ class CallbackEngineConnHook extends EngineConnHook with Logging { newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue) DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap)) - val engineConnPidCallBack = new EngineConnPidCallback() + val engineConnPidCallBack = new EngineConnIdentifierCallback() Utils.tryAndError(engineConnPidCallBack.callback()) logger.info("<--------------------SpringBoot App init succeed-------------------->") } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala index f0995c0b99..595f276704 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala @@ -18,18 +18,28 @@ package org.apache.linkis.engineconn.callback.service import org.apache.linkis.engineconn.core.EngineConnObject +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.engineconn.executor.entity.YarnExecutor import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender import java.lang.management.ManagementFactory -class EngineConnPidCallback extends AbstractEngineConnStartUpCallback { +class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback { override def callback(): Unit = { - val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) + var identifier = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) val instance = Sender.getThisServiceInstance val context = EngineConnObject.getEngineCreationContext - callback(ResponseEngineConnPid(instance, pid, context.getTicketId)) + + val label = LabelUtil.getYarnClusterModeLabel(context.getLabels()) + if (null != label) { + identifier = ExecutorManager.getInstance.getReportExecutor match { + case cluster: YarnExecutor => cluster.getApplicationId + } + } + callback(ResponseEngineConnPid(instance, identifier, context.getTicketId)) } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java index e8c58e3823..914b31d349 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java @@ -341,6 +341,11 @@ private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) { if (null == engineNodeInfo) { return false; } + + // if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { + // return resourceTicketId.equals( engineNodeInfo.getServiceInstance()); + // } + if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) { NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo); Pair> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg()); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java index 3d199fe29c..63975a0890 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java @@ -17,10 +17,14 @@ package org.apache.linkis.manager.am.service.impl; +import org.apache.linkis.common.ServiceInstance; import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid; import org.apache.linkis.manager.am.manager.DefaultEngineNodeManager; import org.apache.linkis.manager.am.service.EngineConnPidCallbackService; +import org.apache.linkis.manager.am.service.engine.AbstractEngineService; +import org.apache.linkis.manager.common.constant.AMConstant; import org.apache.linkis.manager.common.entity.node.EngineNode; +import org.apache.linkis.manager.label.service.NodeLabelService; import org.apache.linkis.rpc.message.annotation.Receiver; import org.springframework.beans.factory.annotation.Autowired; @@ -30,12 +34,15 @@ import org.slf4j.LoggerFactory; @Service -public class DefaultEngineConnPidCallbackService implements EngineConnPidCallbackService { +public class DefaultEngineConnPidCallbackService extends AbstractEngineService + implements EngineConnPidCallbackService { private static final Logger logger = LoggerFactory.getLogger(DefaultEngineConnPidCallbackService.class); @Autowired private DefaultEngineNodeManager defaultEngineNodeManager; + @Autowired private NodeLabelService nodeLabelService; + @Receiver @Override public void dealPid(ResponseEngineConnPid protocol) { @@ -56,6 +63,13 @@ public void dealPid(ResponseEngineConnPid protocol) { } engineNode.setIdentifier(protocol.pid()); + + if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { + ServiceInstance serviceInstance = protocol.serviceInstance(); + engineNode.setServiceInstance(serviceInstance); + getEngineNodeManager().updateEngineNode(serviceInstance, engineNode); + nodeLabelService.updateLabelsToNode(serviceInstance, engineNode.getLabels()); + } defaultEngineNodeManager.updateEngine(engineNode); } } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java index 362932083c..0288d9cc8f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java @@ -64,5 +64,7 @@ public class LabelKeyConstant { public static final String FIXED_EC_KEY = "fixedEngineConn"; + public static final String YARN_CLUSTER_MODE_KEY = "yarnClusterMode"; + public static final String MANAGER_KEY = "manager"; } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/YarnClusterModeLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/YarnClusterModeLabel.java new file mode 100644 index 0000000000..03a4d5d3fc --- /dev/null +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/YarnClusterModeLabel.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.manager.label.entity.engine; + +import org.apache.linkis.manager.label.constant.LabelKeyConstant; +import org.apache.linkis.manager.label.entity.*; +import org.apache.linkis.manager.label.entity.annon.ValueSerialNum; +import org.apache.linkis.manager.label.exception.LabelErrorException; + +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; + +import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY; +import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE; + +public class YarnClusterModeLabel extends GenericLabel implements EngineNodeLabel, UserModifiable { + + public YarnClusterModeLabel() { + setLabelKey(LabelKeyConstant.YARN_CLUSTER_MODE_KEY); + } + + @ValueSerialNum(0) + public void setApplicationId(String applicationId) { + if (getValue() == null) { + setValue(new HashMap<>()); + } + getValue().put("applicationId", applicationId); + } + + public String getApplicationId() { + if (getValue() == null) { + return null; + } + return getValue().get("applicationId"); + } + + @Override + public Feature getFeature() { + return Feature.CORE; + } + + @Override + public void valueCheck(String stringValue) throws LabelErrorException { + if (!StringUtils.isBlank(stringValue)) { + if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) { + throw new LabelErrorException( + LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc()); + } + } else { + throw new LabelErrorException( + CHECK_LABEL_VALUE_EMPTY.getErrorCode(), CHECK_LABEL_VALUE_EMPTY.getErrorDesc()); + } + } +} diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala index b4a66d2f46..d50884cf12 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala @@ -22,7 +22,8 @@ import org.apache.linkis.manager.label.entity.engine.{ CodeLanguageLabel, EngineConnModeLabel, EngineTypeLabel, - UserCreatorLabel + UserCreatorLabel, + YarnClusterModeLabel } import org.apache.linkis.manager.label.entity.entrance.{ BindEngineLabel, @@ -80,6 +81,10 @@ object LabelUtil { getLabelFromList[CodeLanguageLabel](labels) } + def getYarnClusterModeLabel(labels: util.List[Label[_]]): YarnClusterModeLabel = { + getLabelFromList[YarnClusterModeLabel](labels) + } + def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel = { getLabelFromList[EngineConnModeLabel](labels) } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java index 09d802a951..c803570353 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java @@ -29,6 +29,8 @@ public class AMConstant { public static final String PROCESS_MARK = "process"; + public static final String CLUSTER_PROCESS_MARK = "cluster_process"; + public static final String THREAD_MARK = "thread"; public static final String START_REASON = "start_reason"; diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala index 2264db61f7..19462d78df 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala @@ -25,7 +25,7 @@ import org.apache.linkis.engineconn.computation.executor.execute.{ } import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException -import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor +import org.apache.linkis.engineconn.executor.entity.{ResourceFetchExecutor, YarnExecutor} import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc} import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper import org.apache.linkis.engineplugin.spark.extension.{ @@ -56,6 +56,7 @@ import scala.collection.mutable.ArrayBuffer abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends ComputationExecutor with Logging + with YarnExecutor with ResourceFetchExecutor { private var initialized: Boolean = false @@ -70,9 +71,17 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) private var thread: Thread = _ + private var applicationId: String = _ + + override def getApplicationId: String = applicationId + + override def getApplicationURL: String = "" + override def getYarnMode: String = "" + override def getQueue: String = "" + override def init(): Unit = { logger.info(s"Ready to change engine state!") -// setCodeParser() // todo check + // setCodeParser() // todo check super.init() } @@ -94,6 +103,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) engineExecutorContext.appendStdout( LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}") ) + this.applicationId = sc.applicationId // Pre-execution hook var executionHook: SparkPreExecutionHook = null Utils.tryCatch { diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index d9096d1051..8900ce05f3 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -120,7 +120,8 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa memory } - val deployMode: String = getValueAndRemove(properties, SPARK_DEPLOY_MODE) + // val deployMode: String = getValueAndRemove(properties, SPARK_DEPLOY_MODE) + val deployMode: String = SPARK_DEPLOY_MODE.getValue(properties) val isYarnCluster: Boolean = if (deployMode.equals(SparkConfiguration.SPARK_YARN_CLUSTER)) true else false @@ -219,6 +220,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val keyValue = iterator.next() if ( !SPARK_PYTHON_VERSION.key.equals(keyValue.getKey) && + !SPARK_DEPLOY_MODE.key.equals(keyValue.getKey) && keyValue.getKey.startsWith("spark.") && StringUtils.isNotBlank(keyValue.getValue) ) { From 160ab55b81a3cefef0fc6ac2d7eaad2fa977edae Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 11 Aug 2023 10:46:49 +0800 Subject: [PATCH 07/12] spark support yarn cluster --- .../am/manager/DefaultEngineNodeManager.java | 11 ++++++ .../manager/am/manager/EngineNodeManager.java | 2 + .../engine/DefaultEngineCreateService.java | 11 +++--- .../linkis/manager/dao/NodeManagerMapper.java | 2 + .../persistence/NodeManagerPersistence.java | 2 + .../impl/DefaultNodeManagerPersistence.java | 32 ++++++++++++++++ .../mapper/common/NodeManagerMapper.xml | 38 +++++++++++-------- 7 files changed, 77 insertions(+), 21 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java index b8b38eae30..5d3fc70998 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java @@ -127,6 +127,17 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) { return dbEngineNode; } + @Override + public EngineNode getEngineNodeInfoByTicketId(String ticketId) { + EngineNode dbEngineNode = nodeManagerPersistence.getEngineNode(ticketId); + if (null == dbEngineNode) { + throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db"); + } + metricsConverter.fillMetricsToNode( + dbEngineNode, nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode)); + return dbEngineNode; + } + @Override public void updateEngineStatus( ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java index 252d97c0bf..ce79d79c7e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java @@ -38,6 +38,8 @@ public interface EngineNodeManager { EngineNode getEngineNodeInfoByDB(EngineNode engineNode); + EngineNode getEngineNodeInfoByTicketId(String ticketId); + /** * Get detailed engine information from the persistence * diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java index 914b31d349..e3fd01e2a1 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java @@ -337,15 +337,16 @@ private List> fromEMGetEngineLabels(List> emLabels) { } private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) { - EngineNode engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode); + EngineNode engineNodeInfo; + if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { + engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId); + } else { + engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode); + } if (null == engineNodeInfo) { return false; } - // if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { - // return resourceTicketId.equals( engineNodeInfo.getServiceInstance()); - // } - if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) { NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo); Pair> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg()); diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java index 4e9546944a..6f11c910e4 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java @@ -49,6 +49,8 @@ void updateNodeInstance( PersistenceNode getNodeInstance(@Param("instance") String instance); + PersistenceNode getNodeInstanceByTicketId(@Param("ticketId") String ticketId); + PersistenceNode getNodeInstanceById(@Param("id") int id); PersistenceNode getEMNodeInstanceByEngineNode(@Param("instance") String instance); diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java index bb95c8cf7d..e9fcab7017 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java @@ -105,6 +105,8 @@ void updateEngineNode(ServiceInstance serviceInstance, Node node) */ EngineNode getEngineNode(ServiceInstance serviceInstance); + EngineNode getEngineNode(String ticketId); + /** * 通过Em的ServiceInstance 获取EM下面Engine的列表 * diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java index 4a1697333b..805f127ac8 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java @@ -274,6 +274,38 @@ public EngineNode getEngineNode(ServiceInstance serviceInstance) { return amEngineNode; } + @Override + public EngineNode getEngineNode(String ticketId) { + AMEngineNode amEngineNode = new AMEngineNode(); + // amEngineNode.setServiceInstance(serviceInstance); + PersistenceNode engineNode = nodeManagerMapper.getNodeInstanceByTicketId(ticketId); + if (null == engineNode) { + return null; + } + amEngineNode.setOwner(engineNode.getOwner()); + amEngineNode.setMark(engineNode.getMark()); + amEngineNode.setIdentifier(engineNode.getIdentifier()); + amEngineNode.setTicketId(engineNode.getTicketId()); + amEngineNode.setStartTime(engineNode.getCreateTime()); + // PersistenceNode emNode = + // + // nodeManagerMapper.getEMNodeInstanceByEngineNode(serviceInstance.getInstance()); + // if (emNode != null) { + // String emInstance = emNode.getInstance(); + // String emName = emNode.getName(); + // ServiceInstance emServiceInstance = new ServiceInstance(); + // emServiceInstance.setApplicationName(emName); + // emServiceInstance.setInstance(emInstance); + // AMEMNode amemNode = new AMEMNode(); + // amemNode.setMark(emNode.getMark()); + // amemNode.setOwner(emNode.getOwner()); + // amemNode.setServiceInstance(emServiceInstance); + // amemNode.setStartTime(emNode.getCreateTime()); + // amEngineNode.setEMNode(amemNode); + // } + return amEngineNode; + } + @Override public List getEngineNodeByEM(ServiceInstance serviceInstance) { // serviceinstance for a given EM(给定EM的 serviceinstance) diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml index b470daead1..935ceb4f80 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml @@ -6,9 +6,9 @@ ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at - ~ + ~ ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ + ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,9 +34,9 @@ INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, mark, ticketId, update_time - , create_time, updator, creator) + , create_time, updator, creator) VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{ticketId}, #{updateTime} - , #{createTime}, #{updator}, #{creator}) + , #{createTime}, #{updator}, #{creator}) @@ -119,9 +119,9 @@ + + @@ -150,17 +156,17 @@ SELECT * FROM linkis_cg_manager_service_instance WHERE instance IN ( - SELECT engine_instance - FROM linkis_cg_manager_engine_em - WHERE em_instance = #{instance} + SELECT engine_instance + FROM linkis_cg_manager_engine_em + WHERE em_instance = #{instance} ) From e676775177be109db91785ad5e1ad86cfeb615ee Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 11 Aug 2023 11:24:08 +0800 Subject: [PATCH 08/12] spark support yarn cluster --- .../am/manager/DefaultEngineNodeManager.java | 2 +- .../engine/DefaultEngineCreateService.java | 4 +++ .../persistence/NodeManagerPersistence.java | 2 +- .../impl/DefaultNodeManagerPersistence.java | 26 ++++++------------- 4 files changed, 14 insertions(+), 20 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java index 5d3fc70998..14d548ef77 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java @@ -129,7 +129,7 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) { @Override public EngineNode getEngineNodeInfoByTicketId(String ticketId) { - EngineNode dbEngineNode = nodeManagerPersistence.getEngineNode(ticketId); + EngineNode dbEngineNode = nodeManagerPersistence.getEngineNodeByTicketId(ticketId); if (null == dbEngineNode) { throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db"); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java index e3fd01e2a1..6f35edc3a9 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java @@ -347,6 +347,10 @@ private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) { return false; } + if (engineNodeInfo.getServiceInstance() != null) { + engineNode.setServiceInstance(engineNodeInfo.getServiceInstance()); + } + if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) { NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo); Pair> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg()); diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java index e9fcab7017..b83c82fd30 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java @@ -105,7 +105,7 @@ void updateEngineNode(ServiceInstance serviceInstance, Node node) */ EngineNode getEngineNode(ServiceInstance serviceInstance); - EngineNode getEngineNode(String ticketId); + EngineNode getEngineNodeByTicketId(String ticketId); /** * 通过Em的ServiceInstance 获取EM下面Engine的列表 diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java index 805f127ac8..14db7252fa 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java @@ -275,34 +275,24 @@ public EngineNode getEngineNode(ServiceInstance serviceInstance) { } @Override - public EngineNode getEngineNode(String ticketId) { + public EngineNode getEngineNodeByTicketId(String ticketId) { AMEngineNode amEngineNode = new AMEngineNode(); - // amEngineNode.setServiceInstance(serviceInstance); PersistenceNode engineNode = nodeManagerMapper.getNodeInstanceByTicketId(ticketId); + if (null == engineNode) { return null; } + + ServiceInstance serviceInstance = new ServiceInstance(); + serviceInstance.setInstance(engineNode.getInstance()); + serviceInstance.setApplicationName(engineNode.getName()); + amEngineNode.setServiceInstance(serviceInstance); + amEngineNode.setOwner(engineNode.getOwner()); amEngineNode.setMark(engineNode.getMark()); amEngineNode.setIdentifier(engineNode.getIdentifier()); amEngineNode.setTicketId(engineNode.getTicketId()); amEngineNode.setStartTime(engineNode.getCreateTime()); - // PersistenceNode emNode = - // - // nodeManagerMapper.getEMNodeInstanceByEngineNode(serviceInstance.getInstance()); - // if (emNode != null) { - // String emInstance = emNode.getInstance(); - // String emName = emNode.getName(); - // ServiceInstance emServiceInstance = new ServiceInstance(); - // emServiceInstance.setApplicationName(emName); - // emServiceInstance.setInstance(emInstance); - // AMEMNode amemNode = new AMEMNode(); - // amemNode.setMark(emNode.getMark()); - // amemNode.setOwner(emNode.getOwner()); - // amemNode.setServiceInstance(emServiceInstance); - // amemNode.setStartTime(emNode.getCreateTime()); - // amEngineNode.setEMNode(amemNode); - // } return amEngineNode; } From bfa6fba001c05995259d4cf7ad75b5bcb741c1cf Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 11 Aug 2023 17:58:34 +0800 Subject: [PATCH 09/12] spark support yarn cluster --- docs/configuration/spark.md | 3 + .../AbstractEngineConnLaunchService.scala | 7 +- .../hook/CallbackEngineConnHook.scala | 4 +- .../service/EngineConnPidCallback.scala | 5 +- .../DefaultEngineConnPidCallbackService.java | 4 +- .../label/service/NodeLabelService.java | 3 + .../service/impl/DefaultNodeLabelService.java | 78 +++++++++++++++++++ .../label/constant/LabelKeyConstant.java | 2 +- .../label/constant/LabelValueConstant.java | 2 + ....java => EngingeConnRuntimeModeLabel.java} | 15 ++-- .../manager/label/utils/LabelUtil.scala | 8 +- .../executor/SparkEngineConnExecutor.scala | 3 +- .../factory/SparkEngineConnFactory.scala | 19 ++--- ...SubmitProcessEngineConnLaunchBuilder.scala | 6 +- 14 files changed, 127 insertions(+), 32 deletions(-) rename linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/{YarnClusterModeLabel.java => EngingeConnRuntimeModeLabel.java} (84%) diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md index f0a4723c7b..aac8c65d7f 100644 --- a/docs/configuration/spark.md +++ b/docs/configuration/spark.md @@ -3,6 +3,7 @@ | Module Name (Service Name) | Parameter Name | Default Value | Description |Used| | -------- | -------- | ----- |----- | ----- | +|spark|linkis.spark.yarn.cluster.jars|hdfs:///spark/cluster|spark.yarn.cluster.jars| |spark|linkis.spark.etl.support.hudi|false|spark.etl.support.hudi| |spark|linkis.bgservice.store.prefix|hdfs:///tmp/bdp-ide/|bgservice.store.prefix| |spark|linkis.bgservice.store.suffix| |bgservice.store.suffix| @@ -27,6 +28,8 @@ |spark|wds.linkis.spark.engineconn.fatal.log|error writing class;OutOfMemoryError|spark.engineconn.fatal.log| |spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable| +Use spark yarn cluster mode, need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster') +spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*'' The spark-excel package may cause class conflicts,need to download separately,put it in spark lib wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala index f7258f4e4b..e4fd2fb8e2 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala @@ -38,7 +38,8 @@ import org.apache.linkis.manager.common.protocol.engine.{ EngineStopRequest } import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest -import org.apache.linkis.manager.label.entity.engine.YarnClusterModeLabel +import org.apache.linkis.manager.label.constant.LabelValueConstant +import org.apache.linkis.manager.label.entity.engine.EngingeConnRuntimeModeLabel import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender @@ -157,7 +158,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w engineNode.setOwner(request.user) if (StringUtils.isNotBlank(deployMode) && deployMode.equals("cluster")) { engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK) - engineNode.getLabels.add(new YarnClusterModeLabel()) + val engingeConnRuntimeModeLabel = new EngingeConnRuntimeModeLabel() + engingeConnRuntimeModeLabel.setModeValue(LabelValueConstant.YARN_CLUSTER_VALUE) + engineNode.getLabels.add(engingeConnRuntimeModeLabel) } else { engineNode.setMark(AMConstant.PROCESS_MARK) } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala index 63703dd5b9..adcbb1a695 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala @@ -61,8 +61,8 @@ class CallbackEngineConnHook extends EngineConnHook with Logging { newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue) DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap)) - val engineConnPidCallBack = new EngineConnIdentifierCallback() - Utils.tryAndError(engineConnPidCallBack.callback()) + val engineConnIdentifierCallback = new EngineConnIdentifierCallback() + Utils.tryAndError(engineConnIdentifierCallback.callback()) logger.info("<--------------------SpringBoot App init succeed-------------------->") } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala index 595f276704..71f71f1999 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala @@ -21,6 +21,7 @@ import org.apache.linkis.engineconn.core.EngineConnObject import org.apache.linkis.engineconn.core.executor.ExecutorManager import org.apache.linkis.engineconn.executor.entity.YarnExecutor import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender @@ -33,8 +34,8 @@ class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback { val instance = Sender.getThisServiceInstance val context = EngineConnObject.getEngineCreationContext - val label = LabelUtil.getYarnClusterModeLabel(context.getLabels()) - if (null != label) { + val label = LabelUtil.getEngingeConnRuntimeModeLabel(context.getLabels()) + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) { identifier = ExecutorManager.getInstance.getReportExecutor match { case cluster: YarnExecutor => cluster.getApplicationId } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java index 63975a0890..385cf8bfd7 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java @@ -67,8 +67,8 @@ public void dealPid(ResponseEngineConnPid protocol) { if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { ServiceInstance serviceInstance = protocol.serviceInstance(); engineNode.setServiceInstance(serviceInstance); - getEngineNodeManager().updateEngineNode(serviceInstance, engineNode); - nodeLabelService.updateLabelsToNode(serviceInstance, engineNode.getLabels()); + nodeLabelService.labelsFromInstanceToNewInstance( + engineNode.getServiceInstance(), serviceInstance); } defaultEngineNodeManager.updateEngine(engineNode); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java index a5bfcab1cf..4dc1976c33 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java @@ -47,6 +47,9 @@ public interface NodeLabelService { void updateLabelsToNode(ServiceInstance instance, List> labels); + void labelsFromInstanceToNewInstance( + ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance); + /** * Remove the labels related by node instance * diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java index 4da2bebc65..0192620442 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java @@ -196,6 +196,84 @@ public void updateLabelsToNode(ServiceInstance instance, List> labels) } } + @Override + public void labelsFromInstanceToNewInstance( + ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) { + List labels = + labelManagerPersistence.getLabelByServiceInstance(newServiceInstance); + List newKeyList = labels.stream().map(Label::getLabelKey).collect(Collectors.toList()); + List nodeLabels = + labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance); + + List oldKeyList = + nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList()); + List willBeDelete = new ArrayList<>(oldKeyList); + willBeDelete.removeAll(newKeyList); + + List willBeAdd = new ArrayList<>(newKeyList); + willBeAdd.removeAll(oldKeyList); + + List willBeUpdate = new ArrayList<>(oldKeyList); + willBeUpdate.removeAll(willBeDelete); + + Set modifiableKeyList = LabelUtils.listAllUserModifiableLabel(); + if (!willBeDelete.isEmpty()) { + nodeLabels.forEach( + nodeLabel -> { + if (modifiableKeyList.contains(nodeLabel.getLabelKey()) + && willBeDelete.contains(nodeLabel.getLabelKey())) { + List labelIds = new ArrayList<>(); + labelIds.add(nodeLabel.getId()); + labelManagerPersistence.removeNodeLabels(oldServiceInstance, labelIds); + } + }); + } + + /** + * update step: 1.delete relations of old labels 2.add new relation between new labels and + * instance + */ + if (willBeUpdate != null && !willBeUpdate.isEmpty()) { + labels.forEach( + label -> { + if (modifiableKeyList.contains(label.getLabelKey()) + && willBeUpdate.contains(label.getLabelKey())) { + nodeLabels.stream() + .filter(nodeLabel -> nodeLabel.getLabelKey().equals(label.getLabelKey())) + .forEach( + oldLabel -> { + PersistenceLabel persistenceLabel = + LabelManagerUtils.convertPersistenceLabel(label); + List labelIds = new ArrayList<>(); + labelIds.add(oldLabel.getId()); + labelManagerPersistence.removeNodeLabels(oldServiceInstance, labelIds); + int newLabelId = tryToAddLabel(persistenceLabel); + labelIds.remove((Integer) oldLabel.getId()); + labelIds.add(newLabelId); + labelManagerPersistence.addLabelToNode(oldServiceInstance, labelIds); + }); + } + }); + } + if (willBeAdd != null && !willBeAdd.isEmpty()) { + labels.stream() + .filter(label -> willBeAdd.contains(label.getLabelKey())) + .forEach( + label -> { + if (modifiableKeyList.contains(label.getLabelKey())) { + PersistenceLabel persistenceLabel = + LabelManagerUtils.convertPersistenceLabel(label); + int labelId = tryToAddLabel(persistenceLabel); + if (labelId > 0) { + List labelIds = new ArrayList<>(); + labelIds.add(labelId); + labelManagerPersistence.addLabelToNode(oldServiceInstance, labelIds); + } + } + }); + } + } + /** * Remove the labels related by node instance * diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java index 0288d9cc8f..8021b35851 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java @@ -64,7 +64,7 @@ public class LabelKeyConstant { public static final String FIXED_EC_KEY = "fixedEngineConn"; - public static final String YARN_CLUSTER_MODE_KEY = "yarnClusterMode"; + public static final String ENGINGE_CONN_RUNTIME_MODE_KEY = "engingeConnRuntimeMode"; public static final String MANAGER_KEY = "manager"; } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java index cc62921c81..ce679ec4eb 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java @@ -20,4 +20,6 @@ public class LabelValueConstant { public static final String OFFLINE_VALUE = "offline"; + + public static final String YARN_CLUSTER_VALUE = "yarn-cluster"; } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/YarnClusterModeLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java similarity index 84% rename from linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/YarnClusterModeLabel.java rename to linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java index 03a4d5d3fc..7460f5589d 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/YarnClusterModeLabel.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java @@ -29,25 +29,26 @@ import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY; import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE; -public class YarnClusterModeLabel extends GenericLabel implements EngineNodeLabel, UserModifiable { +public class EngingeConnRuntimeModeLabel extends GenericLabel + implements EngineNodeLabel, UserModifiable { - public YarnClusterModeLabel() { - setLabelKey(LabelKeyConstant.YARN_CLUSTER_MODE_KEY); + public EngingeConnRuntimeModeLabel() { + setLabelKey(LabelKeyConstant.ENGINGE_CONN_RUNTIME_MODE_KEY); } @ValueSerialNum(0) - public void setApplicationId(String applicationId) { + public void setModeValue(String modeValue) { if (getValue() == null) { setValue(new HashMap<>()); } - getValue().put("applicationId", applicationId); + getValue().put("modeValue", modeValue); } - public String getApplicationId() { + public String getModeValue() { if (getValue() == null) { return null; } - return getValue().get("applicationId"); + return getValue().get("modeValue"); } @Override diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala index d50884cf12..3965a5ea11 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala @@ -22,8 +22,8 @@ import org.apache.linkis.manager.label.entity.engine.{ CodeLanguageLabel, EngineConnModeLabel, EngineTypeLabel, - UserCreatorLabel, - YarnClusterModeLabel + EngingeConnRuntimeModeLabel, + UserCreatorLabel } import org.apache.linkis.manager.label.entity.entrance.{ BindEngineLabel, @@ -81,8 +81,8 @@ object LabelUtil { getLabelFromList[CodeLanguageLabel](labels) } - def getYarnClusterModeLabel(labels: util.List[Label[_]]): YarnClusterModeLabel = { - getLabelFromList[YarnClusterModeLabel](labels) + def getEngingeConnRuntimeModeLabel(labels: util.List[Label[_]]): EngingeConnRuntimeModeLabel = { + getLabelFromList[EngingeConnRuntimeModeLabel](labels) } def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel = { diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala index 19462d78df..8d97e81525 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala @@ -71,7 +71,7 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) private var thread: Thread = _ - private var applicationId: String = _ + private var applicationId: String = sc.applicationId override def getApplicationId: String = applicationId @@ -103,7 +103,6 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) engineExecutorContext.appendStdout( LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}") ) - this.applicationId = sc.applicationId // Pre-execution hook var executionHook: SparkPreExecutionHook = null Utils.tryCatch { diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index 5afe696e71..bc18e2badf 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -39,8 +39,10 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ } import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.server.JMap import org.apache.commons.lang3.StringUtils @@ -145,18 +147,17 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue) logger.info(s"------ Create new SparkContext {$master} -------") - // Set deploy-mode with the optional values `cluster`and `client`, the default value `client` - val deployMode: String = SPARK_DEPLOY_MODE.getValue(options) - if ( - StringUtils - .isNotBlank(deployMode) && (deployMode - .equals(SPARK_YARN_CLUSTER) || deployMode.equals(SPARK_YARN_CLIENT)) - ) { - sparkConf.set("spark.submit.deployMode", deployMode) + val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels()) + val isYarnClusterMode: Boolean = + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + + if (isYarnClusterMode) { + sparkConf.set("spark.submit.deployMode", "cluster") } // todo yarn cluster暂时不支持pyspark,后期对pyspark进行处理 - if (StringUtils.isNotBlank(deployMode) && deployMode.equals(SPARK_YARN_CLIENT)) { + if (!isYarnClusterMode) { val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue val pysparkPath = new File(pysparkBasePath, "python" + File.separator + "lib") var pythonLibUris = pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip")) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 8900ce05f3..bce21165a5 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -66,7 +66,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY) val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES) - var files = getValueAndRemove(properties, "files", "").split(",").filter(isNotBlankPath) + var files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "") + .split(",") + .filter(isNotBlankPath) + .toBuffer + .asInstanceOf[ArrayBuffer[String]] val jars = new ArrayBuffer[String]() jars ++= getValueAndRemove(properties, "jars", "").split(",").filter(isNotBlankPath) jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH) From 852131becd6e817d8f3ae44a0466390c47236f77 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Sat, 12 Aug 2023 15:40:58 +0800 Subject: [PATCH 10/12] spark support yarn cluster --- docs/configuration/spark.md | 2 +- .../AbstractEngineConnLaunchService.scala | 12 +++--- .../DefaultEngineConnPidCallbackService.java | 1 + .../service/impl/DefaultNodeLabelService.java | 43 +++++-------------- .../label/constant/LabelValueConstant.java | 2 +- ...SubmitProcessEngineConnLaunchBuilder.scala | 23 +++++----- 6 files changed, 32 insertions(+), 51 deletions(-) diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md index aac8c65d7f..f40c76b43d 100644 --- a/docs/configuration/spark.md +++ b/docs/configuration/spark.md @@ -28,7 +28,7 @@ |spark|wds.linkis.spark.engineconn.fatal.log|error writing class;OutOfMemoryError|spark.engineconn.fatal.log| |spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable| -Use spark yarn cluster mode, need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster') +Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster') spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*'' The spark-excel package may cause class conflicts,need to download separately,put it in spark lib diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala index e4fd2fb8e2..27598bf19d 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala @@ -39,11 +39,9 @@ import org.apache.linkis.manager.common.protocol.engine.{ } import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest import org.apache.linkis.manager.label.constant.LabelValueConstant -import org.apache.linkis.manager.label.entity.engine.EngingeConnRuntimeModeLabel import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender -import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import scala.concurrent.ExecutionContextExecutorService @@ -152,15 +150,17 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w val deployMode: String = request.creationDesc.properties.getOrDefault("spark.submit.deployMode", "client") + val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels) + val isYarnClusterMode: Boolean = + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + val engineNode = new AMEngineNode() engineNode.setLabels(conn.getLabels) engineNode.setServiceInstance(conn.getServiceInstance) engineNode.setOwner(request.user) - if (StringUtils.isNotBlank(deployMode) && deployMode.equals("cluster")) { + if (isYarnClusterMode) { engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK) - val engingeConnRuntimeModeLabel = new EngingeConnRuntimeModeLabel() - engingeConnRuntimeModeLabel.setModeValue(LabelValueConstant.YARN_CLUSTER_VALUE) - engineNode.getLabels.add(engingeConnRuntimeModeLabel) } else { engineNode.setMark(AMConstant.PROCESS_MARK) } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java index 385cf8bfd7..4acfb70f91 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java @@ -67,6 +67,7 @@ public void dealPid(ResponseEngineConnPid protocol) { if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { ServiceInstance serviceInstance = protocol.serviceInstance(); engineNode.setServiceInstance(serviceInstance); + getEngineNodeManager().updateEngineNode(serviceInstance, engineNode); nodeLabelService.labelsFromInstanceToNewInstance( engineNode.getServiceInstance(), serviceInstance); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java index 0192620442..08d49b7750 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java @@ -207,16 +207,14 @@ public void labelsFromInstanceToNewInstance( List oldKeyList = nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList()); - List willBeDelete = new ArrayList<>(oldKeyList); - willBeDelete.removeAll(newKeyList); - - List willBeAdd = new ArrayList<>(newKeyList); - willBeAdd.removeAll(oldKeyList); - List willBeUpdate = new ArrayList<>(oldKeyList); - willBeUpdate.removeAll(willBeDelete); + // Gets duplicate labels based on the intersection + List willBeDelete = new ArrayList<>(oldKeyList); + willBeDelete.retainAll(newKeyList); Set modifiableKeyList = LabelUtils.listAllUserModifiableLabel(); + + // Delete duplicate labels if (!willBeDelete.isEmpty()) { nodeLabels.forEach( nodeLabel -> { @@ -229,32 +227,11 @@ public void labelsFromInstanceToNewInstance( }); } - /** - * update step: 1.delete relations of old labels 2.add new relation between new labels and - * instance - */ - if (willBeUpdate != null && !willBeUpdate.isEmpty()) { - labels.forEach( - label -> { - if (modifiableKeyList.contains(label.getLabelKey()) - && willBeUpdate.contains(label.getLabelKey())) { - nodeLabels.stream() - .filter(nodeLabel -> nodeLabel.getLabelKey().equals(label.getLabelKey())) - .forEach( - oldLabel -> { - PersistenceLabel persistenceLabel = - LabelManagerUtils.convertPersistenceLabel(label); - List labelIds = new ArrayList<>(); - labelIds.add(oldLabel.getId()); - labelManagerPersistence.removeNodeLabels(oldServiceInstance, labelIds); - int newLabelId = tryToAddLabel(persistenceLabel); - labelIds.remove((Integer) oldLabel.getId()); - labelIds.add(newLabelId); - labelManagerPersistence.addLabelToNode(oldServiceInstance, labelIds); - }); - } - }); - } + // Get non-duplicate labels + List willBeAdd = new ArrayList<>(oldKeyList); + willBeAdd.removeAll(willBeDelete); + + // Associate the newServiceInstance with the old labels if (willBeAdd != null && !willBeAdd.isEmpty()) { labels.stream() .filter(label -> willBeAdd.contains(label.getLabelKey())) diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java index ce679ec4eb..35c0d06e2e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java @@ -21,5 +21,5 @@ public class LabelValueConstant { public static final String OFFLINE_VALUE = "offline"; - public static final String YARN_CLUSTER_VALUE = "yarn-cluster"; + public static final String YARN_CLUSTER_VALUE = "yarnCluster"; } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index bce21165a5..a27895ecf4 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -18,7 +18,6 @@ package org.apache.linkis.engineplugin.spark.launch import org.apache.linkis.common.conf.CommonVars -import org.apache.linkis.engineplugin.spark.config.SparkConfiguration import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ ENGINE_JAR, SPARK_APP_NAME, @@ -39,7 +38,9 @@ import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest import org.apache.linkis.manager.engineplugin.common.launch.process.Environment._ import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.protocol.UserWithCreator import org.apache.commons.lang3.StringUtils @@ -66,7 +67,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY) val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES) - var files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "") + val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "") .split(",") .filter(isNotBlankPath) .toBuffer @@ -124,13 +125,14 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa memory } - // val deployMode: String = getValueAndRemove(properties, SPARK_DEPLOY_MODE) val deployMode: String = SPARK_DEPLOY_MODE.getValue(properties) - val isYarnCluster: Boolean = - if (deployMode.equals(SparkConfiguration.SPARK_YARN_CLUSTER)) true else false + val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineConnBuildRequest.labels) + val isYarnClusterMode: Boolean = + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false - if (isYarnCluster) { + if (isYarnClusterMode) { files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties") var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS) @@ -170,8 +172,9 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa addOpt("--num-executors", numExecutors.toString) addOpt("--queue", queue) - getConf(engineConnBuildRequest, gcLogDir, logDir, isYarnCluster).foreach { case (key, value) => - addOpt("--conf", s"""$key="$value"""") + getConf(engineConnBuildRequest, gcLogDir, logDir, isYarnClusterMode).foreach { + case (key, value) => + addOpt("--conf", s"""$key="$value"""") } addOpt("--class", className) @@ -186,7 +189,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa engineConnBuildRequest: EngineConnBuildRequest, gcLogDir: String, logDir: String, - isYarnCluster: Boolean + isYarnClusterMode: Boolean ): ArrayBuffer[(String, String)] = { val driverJavaSet = new StringBuilder(" -server") if (StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue)) { @@ -202,7 +205,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa .foreach(l => { driverJavaSet.append(" ").append(l) }) - if (isYarnCluster) { + if (isYarnClusterMode) { driverJavaSet.append(" -Djava.io.tmpdir=/tmp") } else { driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS)) From dd06fc1c46e9a3e4bd28c2c914587c66899731e2 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 14 Aug 2023 10:29:29 +0800 Subject: [PATCH 11/12] spark support yarn cluster --- .../service/impl/DefaultNodeLabelService.java | 51 +++++++------------ 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java index 08d49b7750..8529b6d20e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java @@ -208,47 +208,30 @@ public void labelsFromInstanceToNewInstance( List oldKeyList = nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList()); - // Gets duplicate labels based on the intersection - List willBeDelete = new ArrayList<>(oldKeyList); - willBeDelete.retainAll(newKeyList); - - Set modifiableKeyList = LabelUtils.listAllUserModifiableLabel(); + List willBeAdd = new ArrayList<>(oldKeyList); + willBeAdd.removeAll(newKeyList); - // Delete duplicate labels - if (!willBeDelete.isEmpty()) { + // Assign the old association to the newServiceInstance + if (!CollectionUtils.isEmpty(willBeAdd)) { nodeLabels.forEach( nodeLabel -> { - if (modifiableKeyList.contains(nodeLabel.getLabelKey()) - && willBeDelete.contains(nodeLabel.getLabelKey())) { - List labelIds = new ArrayList<>(); - labelIds.add(nodeLabel.getId()); - labelManagerPersistence.removeNodeLabels(oldServiceInstance, labelIds); + if (willBeAdd.contains(nodeLabel.getLabelKey())) { + PersistenceLabel persistenceLabel = + LabelManagerUtils.convertPersistenceLabel(nodeLabel); + int labelId = tryToAddLabel(persistenceLabel); + if (labelId > 0) { + List labelIds = new ArrayList<>(); + labelIds.add(labelId); + labelManagerPersistence.addLabelToNode(newServiceInstance, labelIds); + } } }); } - // Get non-duplicate labels - List willBeAdd = new ArrayList<>(oldKeyList); - willBeAdd.removeAll(willBeDelete); - - // Associate the newServiceInstance with the old labels - if (willBeAdd != null && !willBeAdd.isEmpty()) { - labels.stream() - .filter(label -> willBeAdd.contains(label.getLabelKey())) - .forEach( - label -> { - if (modifiableKeyList.contains(label.getLabelKey())) { - PersistenceLabel persistenceLabel = - LabelManagerUtils.convertPersistenceLabel(label); - int labelId = tryToAddLabel(persistenceLabel); - if (labelId > 0) { - List labelIds = new ArrayList<>(); - labelIds.add(labelId); - labelManagerPersistence.addLabelToNode(oldServiceInstance, labelIds); - } - } - }); - } + // Delete an old association + List oldLabelId = + nodeLabels.stream().map(PersistenceLabel::getId).collect(Collectors.toList()); + labelManagerPersistence.removeNodeLabels(oldServiceInstance, oldLabelId); } /** From 28e57300f7a3cadf771b79c40360f3196dca14f6 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 14 Aug 2023 17:24:14 +0800 Subject: [PATCH 12/12] spark support yarn cluster --- .../server/service/impl/AbstractEngineConnLaunchService.scala | 2 -- .../launch/SparkSubmitProcessEngineConnLaunchBuilder.scala | 4 +++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala index 27598bf19d..390822df0d 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala @@ -147,8 +147,6 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w throw t } LoggerUtils.removeJobIdMDC() - val deployMode: String = - request.creationDesc.properties.getOrDefault("spark.submit.deployMode", "client") val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels) val isYarnClusterMode: Boolean = diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index a27895ecf4..2487ede907 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -18,6 +18,7 @@ package org.apache.linkis.engineplugin.spark.launch import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.engineplugin.spark.config.SparkConfiguration import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ ENGINE_JAR, SPARK_APP_NAME, @@ -125,7 +126,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa memory } - val deployMode: String = SPARK_DEPLOY_MODE.getValue(properties) + var deployMode: String = SparkConfiguration.SPARK_YARN_CLIENT val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineConnBuildRequest.labels) val isYarnClusterMode: Boolean = @@ -133,6 +134,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa else false if (isYarnClusterMode) { + deployMode = SparkConfiguration.SPARK_YARN_CLUSTER files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties") var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS)