From a94c87653f6ee4c0303876fb1349dbd24e0d614a Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 14 Aug 2023 19:11:01 +0800 Subject: [PATCH] spark support yarn cluster (#4850) * spark support yarn cluster * spark support yarn cluster * spark support yarn cluster * spark support yarn cluster * spark support yarn cluster * LinkisManagerApplication Remove useless code * spark support yarn cluster * spark support yarn cluster * spark support yarn cluster * spark support yarn cluster * spark support yarn cluster * spark support yarn cluster --- docs/configuration/spark.md | 3 + .../impl/DefaultEngineConnKillService.java | 7 ++ .../AbstractEngineConnLaunchService.scala | 13 +++- .../hook/CallbackEngineConnHook.scala | 6 +- .../service/EngineConnPidCallback.scala | 17 ++++- .../am/manager/DefaultEngineNodeManager.java | 11 +++ .../manager/am/manager/EngineNodeManager.java | 2 + .../engine/DefaultEngineCreateService.java | 12 +++- .../DefaultEngineConnPidCallbackService.java | 17 ++++- .../label/service/NodeLabelService.java | 3 + .../service/impl/DefaultNodeLabelService.java | 38 ++++++++++ .../label/constant/LabelKeyConstant.java | 2 + .../label/constant/LabelValueConstant.java | 2 + .../engine/EngingeConnRuntimeModeLabel.java | 71 +++++++++++++++++++ .../manager/label/utils/LabelUtil.scala | 5 ++ .../manager/common/constant/AMConstant.java | 2 + .../linkis/manager/dao/NodeManagerMapper.java | 2 + .../persistence/NodeManagerPersistence.java | 2 + .../impl/DefaultNodeManagerPersistence.java | 22 ++++++ .../mapper/common/NodeManagerMapper.xml | 38 +++++----- .../errorcode/SparkErrorCodeSummary.java | 4 ++ .../spark/config/SparkConfiguration.scala | 7 ++ .../executor/SparkEngineConnExecutor.scala | 13 +++- .../factory/SparkEngineConnFactory.scala | 39 ++++++---- ...SubmitProcessEngineConnLaunchBuilder.scala | 58 +++++++++++++-- 25 files changed, 350 insertions(+), 46 deletions(-) create mode 100644 linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md index f0a4723c7b..f40c76b43d 100644 --- a/docs/configuration/spark.md +++ b/docs/configuration/spark.md @@ -3,6 +3,7 @@ | Module Name (Service Name) | Parameter Name | Default Value | Description |Used| | -------- | -------- | ----- |----- | ----- | +|spark|linkis.spark.yarn.cluster.jars|hdfs:///spark/cluster|spark.yarn.cluster.jars| |spark|linkis.spark.etl.support.hudi|false|spark.etl.support.hudi| |spark|linkis.bgservice.store.prefix|hdfs:///tmp/bdp-ide/|bgservice.store.prefix| |spark|linkis.bgservice.store.suffix| |bgservice.store.suffix| @@ -27,6 +28,8 @@ |spark|wds.linkis.spark.engineconn.fatal.log|error writing class;OutOfMemoryError|spark.engineconn.fatal.log| |spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable| +Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster') +spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*'' The spark-excel package may cause class conflicts,need to download separately,put it in spark lib wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java index 440208cd62..a6a932a578 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java @@ -94,6 +94,13 @@ public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest killYarnAppIdOfOneEc(engineStopRequest); } + if (AMConstant.CLUSTER_PROCESS_MARK.equals(engineStopRequest.getIdentifierType()) + && engineStopRequest.getIdentifier() != null) { + List appIds = new ArrayList<>(); + appIds.add(engineStopRequest.getIdentifier()); + GovernanceUtils.killYarnJobApp(appIds); + } + if (!response.getStopStatus()) { EngineSuicideRequest request = new EngineSuicideRequest( diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala index f088f9fcdc..390822df0d 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala @@ -38,6 +38,7 @@ import org.apache.linkis.manager.common.protocol.engine.{ EngineStopRequest } import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender @@ -146,11 +147,21 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w throw t } LoggerUtils.removeJobIdMDC() + + val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels) + val isYarnClusterMode: Boolean = + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + val engineNode = new AMEngineNode() engineNode.setLabels(conn.getLabels) engineNode.setServiceInstance(conn.getServiceInstance) engineNode.setOwner(request.user) - engineNode.setMark(AMConstant.PROCESS_MARK) + if (isYarnClusterMode) { + engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK) + } else { + engineNode.setMark(AMConstant.PROCESS_MARK) + } engineNode } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala index 1f4c5cec73..adcbb1a695 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala @@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor import org.apache.linkis.engineconn.callback.service.{ EngineConnAfterStartCallback, - EngineConnPidCallback + EngineConnIdentifierCallback } import org.apache.linkis.engineconn.common.conf.EngineConnConf import org.apache.linkis.engineconn.common.creation.EngineCreationContext @@ -61,8 +61,8 @@ class CallbackEngineConnHook extends EngineConnHook with Logging { newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue) DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap)) - val engineConnPidCallBack = new EngineConnPidCallback() - Utils.tryAndError(engineConnPidCallBack.callback()) + val engineConnIdentifierCallback = new EngineConnIdentifierCallback() + Utils.tryAndError(engineConnIdentifierCallback.callback()) logger.info("<--------------------SpringBoot App init succeed-------------------->") } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala index f0995c0b99..71f71f1999 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala @@ -18,18 +18,29 @@ package org.apache.linkis.engineconn.callback.service import org.apache.linkis.engineconn.core.EngineConnObject +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.engineconn.executor.entity.YarnExecutor import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid +import org.apache.linkis.manager.label.constant.LabelValueConstant +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender import java.lang.management.ManagementFactory -class EngineConnPidCallback extends AbstractEngineConnStartUpCallback { +class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback { override def callback(): Unit = { - val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) + var identifier = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) val instance = Sender.getThisServiceInstance val context = EngineConnObject.getEngineCreationContext - callback(ResponseEngineConnPid(instance, pid, context.getTicketId)) + + val label = LabelUtil.getEngingeConnRuntimeModeLabel(context.getLabels()) + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) { + identifier = ExecutorManager.getInstance.getReportExecutor match { + case cluster: YarnExecutor => cluster.getApplicationId + } + } + callback(ResponseEngineConnPid(instance, identifier, context.getTicketId)) } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java index b8b38eae30..14d548ef77 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java @@ -127,6 +127,17 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) { return dbEngineNode; } + @Override + public EngineNode getEngineNodeInfoByTicketId(String ticketId) { + EngineNode dbEngineNode = nodeManagerPersistence.getEngineNodeByTicketId(ticketId); + if (null == dbEngineNode) { + throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db"); + } + metricsConverter.fillMetricsToNode( + dbEngineNode, nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode)); + return dbEngineNode; + } + @Override public void updateEngineStatus( ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java index 252d97c0bf..ce79d79c7e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java @@ -38,6 +38,8 @@ public interface EngineNodeManager { EngineNode getEngineNodeInfoByDB(EngineNode engineNode); + EngineNode getEngineNodeInfoByTicketId(String ticketId); + /** * Get detailed engine information from the persistence * diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java index e8c58e3823..6f35edc3a9 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java @@ -337,10 +337,20 @@ private List> fromEMGetEngineLabels(List> emLabels) { } private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) { - EngineNode engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode); + EngineNode engineNodeInfo; + if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { + engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId); + } else { + engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode); + } if (null == engineNodeInfo) { return false; } + + if (engineNodeInfo.getServiceInstance() != null) { + engineNode.setServiceInstance(engineNodeInfo.getServiceInstance()); + } + if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) { NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo); Pair> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg()); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java index 3d199fe29c..4acfb70f91 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java @@ -17,10 +17,14 @@ package org.apache.linkis.manager.am.service.impl; +import org.apache.linkis.common.ServiceInstance; import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid; import org.apache.linkis.manager.am.manager.DefaultEngineNodeManager; import org.apache.linkis.manager.am.service.EngineConnPidCallbackService; +import org.apache.linkis.manager.am.service.engine.AbstractEngineService; +import org.apache.linkis.manager.common.constant.AMConstant; import org.apache.linkis.manager.common.entity.node.EngineNode; +import org.apache.linkis.manager.label.service.NodeLabelService; import org.apache.linkis.rpc.message.annotation.Receiver; import org.springframework.beans.factory.annotation.Autowired; @@ -30,12 +34,15 @@ import org.slf4j.LoggerFactory; @Service -public class DefaultEngineConnPidCallbackService implements EngineConnPidCallbackService { +public class DefaultEngineConnPidCallbackService extends AbstractEngineService + implements EngineConnPidCallbackService { private static final Logger logger = LoggerFactory.getLogger(DefaultEngineConnPidCallbackService.class); @Autowired private DefaultEngineNodeManager defaultEngineNodeManager; + @Autowired private NodeLabelService nodeLabelService; + @Receiver @Override public void dealPid(ResponseEngineConnPid protocol) { @@ -56,6 +63,14 @@ public void dealPid(ResponseEngineConnPid protocol) { } engineNode.setIdentifier(protocol.pid()); + + if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { + ServiceInstance serviceInstance = protocol.serviceInstance(); + engineNode.setServiceInstance(serviceInstance); + getEngineNodeManager().updateEngineNode(serviceInstance, engineNode); + nodeLabelService.labelsFromInstanceToNewInstance( + engineNode.getServiceInstance(), serviceInstance); + } defaultEngineNodeManager.updateEngine(engineNode); } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java index a5bfcab1cf..4dc1976c33 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java @@ -47,6 +47,9 @@ public interface NodeLabelService { void updateLabelsToNode(ServiceInstance instance, List> labels); + void labelsFromInstanceToNewInstance( + ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance); + /** * Remove the labels related by node instance * diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java index 4da2bebc65..8529b6d20e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java @@ -196,6 +196,44 @@ public void updateLabelsToNode(ServiceInstance instance, List> labels) } } + @Override + public void labelsFromInstanceToNewInstance( + ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) { + List labels = + labelManagerPersistence.getLabelByServiceInstance(newServiceInstance); + List newKeyList = labels.stream().map(Label::getLabelKey).collect(Collectors.toList()); + List nodeLabels = + labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance); + + List oldKeyList = + nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList()); + + List willBeAdd = new ArrayList<>(oldKeyList); + willBeAdd.removeAll(newKeyList); + + // Assign the old association to the newServiceInstance + if (!CollectionUtils.isEmpty(willBeAdd)) { + nodeLabels.forEach( + nodeLabel -> { + if (willBeAdd.contains(nodeLabel.getLabelKey())) { + PersistenceLabel persistenceLabel = + LabelManagerUtils.convertPersistenceLabel(nodeLabel); + int labelId = tryToAddLabel(persistenceLabel); + if (labelId > 0) { + List labelIds = new ArrayList<>(); + labelIds.add(labelId); + labelManagerPersistence.addLabelToNode(newServiceInstance, labelIds); + } + } + }); + } + + // Delete an old association + List oldLabelId = + nodeLabels.stream().map(PersistenceLabel::getId).collect(Collectors.toList()); + labelManagerPersistence.removeNodeLabels(oldServiceInstance, oldLabelId); + } + /** * Remove the labels related by node instance * diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java index 362932083c..8021b35851 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java @@ -64,5 +64,7 @@ public class LabelKeyConstant { public static final String FIXED_EC_KEY = "fixedEngineConn"; + public static final String ENGINGE_CONN_RUNTIME_MODE_KEY = "engingeConnRuntimeMode"; + public static final String MANAGER_KEY = "manager"; } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java index cc62921c81..35c0d06e2e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelValueConstant.java @@ -20,4 +20,6 @@ public class LabelValueConstant { public static final String OFFLINE_VALUE = "offline"; + + public static final String YARN_CLUSTER_VALUE = "yarnCluster"; } diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java new file mode 100644 index 0000000000..7460f5589d --- /dev/null +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngingeConnRuntimeModeLabel.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.manager.label.entity.engine; + +import org.apache.linkis.manager.label.constant.LabelKeyConstant; +import org.apache.linkis.manager.label.entity.*; +import org.apache.linkis.manager.label.entity.annon.ValueSerialNum; +import org.apache.linkis.manager.label.exception.LabelErrorException; + +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; + +import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY; +import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE; + +public class EngingeConnRuntimeModeLabel extends GenericLabel + implements EngineNodeLabel, UserModifiable { + + public EngingeConnRuntimeModeLabel() { + setLabelKey(LabelKeyConstant.ENGINGE_CONN_RUNTIME_MODE_KEY); + } + + @ValueSerialNum(0) + public void setModeValue(String modeValue) { + if (getValue() == null) { + setValue(new HashMap<>()); + } + getValue().put("modeValue", modeValue); + } + + public String getModeValue() { + if (getValue() == null) { + return null; + } + return getValue().get("modeValue"); + } + + @Override + public Feature getFeature() { + return Feature.CORE; + } + + @Override + public void valueCheck(String stringValue) throws LabelErrorException { + if (!StringUtils.isBlank(stringValue)) { + if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) { + throw new LabelErrorException( + LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc()); + } + } else { + throw new LabelErrorException( + CHECK_LABEL_VALUE_EMPTY.getErrorCode(), CHECK_LABEL_VALUE_EMPTY.getErrorDesc()); + } + } +} diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala index b4a66d2f46..3965a5ea11 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala @@ -22,6 +22,7 @@ import org.apache.linkis.manager.label.entity.engine.{ CodeLanguageLabel, EngineConnModeLabel, EngineTypeLabel, + EngingeConnRuntimeModeLabel, UserCreatorLabel } import org.apache.linkis.manager.label.entity.entrance.{ @@ -80,6 +81,10 @@ object LabelUtil { getLabelFromList[CodeLanguageLabel](labels) } + def getEngingeConnRuntimeModeLabel(labels: util.List[Label[_]]): EngingeConnRuntimeModeLabel = { + getLabelFromList[EngingeConnRuntimeModeLabel](labels) + } + def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel = { getLabelFromList[EngineConnModeLabel](labels) } diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java index 09d802a951..c803570353 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java @@ -29,6 +29,8 @@ public class AMConstant { public static final String PROCESS_MARK = "process"; + public static final String CLUSTER_PROCESS_MARK = "cluster_process"; + public static final String THREAD_MARK = "thread"; public static final String START_REASON = "start_reason"; diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java index 4e9546944a..6f11c910e4 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java @@ -49,6 +49,8 @@ void updateNodeInstance( PersistenceNode getNodeInstance(@Param("instance") String instance); + PersistenceNode getNodeInstanceByTicketId(@Param("ticketId") String ticketId); + PersistenceNode getNodeInstanceById(@Param("id") int id); PersistenceNode getEMNodeInstanceByEngineNode(@Param("instance") String instance); diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java index bb95c8cf7d..b83c82fd30 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/NodeManagerPersistence.java @@ -105,6 +105,8 @@ void updateEngineNode(ServiceInstance serviceInstance, Node node) */ EngineNode getEngineNode(ServiceInstance serviceInstance); + EngineNode getEngineNodeByTicketId(String ticketId); + /** * 通过Em的ServiceInstance 获取EM下面Engine的列表 * diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java index 4a1697333b..14db7252fa 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java @@ -274,6 +274,28 @@ public EngineNode getEngineNode(ServiceInstance serviceInstance) { return amEngineNode; } + @Override + public EngineNode getEngineNodeByTicketId(String ticketId) { + AMEngineNode amEngineNode = new AMEngineNode(); + PersistenceNode engineNode = nodeManagerMapper.getNodeInstanceByTicketId(ticketId); + + if (null == engineNode) { + return null; + } + + ServiceInstance serviceInstance = new ServiceInstance(); + serviceInstance.setInstance(engineNode.getInstance()); + serviceInstance.setApplicationName(engineNode.getName()); + amEngineNode.setServiceInstance(serviceInstance); + + amEngineNode.setOwner(engineNode.getOwner()); + amEngineNode.setMark(engineNode.getMark()); + amEngineNode.setIdentifier(engineNode.getIdentifier()); + amEngineNode.setTicketId(engineNode.getTicketId()); + amEngineNode.setStartTime(engineNode.getCreateTime()); + return amEngineNode; + } + @Override public List getEngineNodeByEM(ServiceInstance serviceInstance) { // serviceinstance for a given EM(给定EM的 serviceinstance) diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml index b470daead1..935ceb4f80 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml +++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml @@ -6,9 +6,9 @@ ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at - ~ + ~ ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ + ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,9 +34,9 @@ INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, mark, ticketId, update_time - , create_time, updator, creator) + , create_time, updator, creator) VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{ticketId}, #{updateTime} - , #{createTime}, #{updator}, #{creator}) + , #{createTime}, #{updator}, #{creator}) @@ -119,9 +119,9 @@ + + @@ -150,17 +156,17 @@ SELECT * FROM linkis_cg_manager_service_instance WHERE instance IN ( - SELECT engine_instance - FROM linkis_cg_manager_engine_em - WHERE em_instance = #{instance} + SELECT engine_instance + FROM linkis_cg_manager_engine_em + WHERE em_instance = #{instance} ) diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java index 936e773e40..42f0b66e4d 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java @@ -66,6 +66,10 @@ public enum SparkErrorCodeSummary implements LinkisErrorCode { 43032, "The application start failed, since yarn applicationId is null."), NOT_SUPPORT_METHOD(43040, "Not support method for requestExpectedResource."), + + LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR( + 43042, + "linkis.spark.yarn.cluster.jars parameters configuration errors(linkis.spark.yarn.cluster.jars 参数配置错误)."), ; /** (errorCode)错误码 */ diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index ccc21a7761..b42fc0934e 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -30,6 +30,10 @@ object SparkConfiguration extends Logging { val SPARK_HOME_ENV = "SPARK_HOME" val SPARK_CONF_DIR_ENV = "SPARK_CONF_DIR" + val SPARK_YARN_CLIENT = "client" + + val SPARK_YARN_CLUSTER = "cluster" + val PROCESS_MAX_THREADS = CommonVars[Int]("wds.linkis.process.threadpool.max", 100) val SPARK_SESSION_HOOK = CommonVars[String]("wds.linkis.engine.spark.session.hook", "") @@ -46,6 +50,9 @@ object SparkConfiguration extends Logging { val SPARK_DEPLOY_MODE = CommonVars[String]("spark.submit.deployMode", "client") + val SPARK_YARN_CLUSTER_JARS = + CommonVars[String]("linkis.spark.yarn.cluster.jars", "hdfs:///spark/cluster") + val SPARK_APP_NAME = CommonVars[String]("spark.app.name", "Linkis-EngineConn-Spark") val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "") val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "") diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala index 2264db61f7..8d97e81525 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala @@ -25,7 +25,7 @@ import org.apache.linkis.engineconn.computation.executor.execute.{ } import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException -import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor +import org.apache.linkis.engineconn.executor.entity.{ResourceFetchExecutor, YarnExecutor} import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc} import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper import org.apache.linkis.engineplugin.spark.extension.{ @@ -56,6 +56,7 @@ import scala.collection.mutable.ArrayBuffer abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) extends ComputationExecutor with Logging + with YarnExecutor with ResourceFetchExecutor { private var initialized: Boolean = false @@ -70,9 +71,17 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) private var thread: Thread = _ + private var applicationId: String = sc.applicationId + + override def getApplicationId: String = applicationId + + override def getApplicationURL: String = "" + override def getYarnMode: String = "" + override def getQueue: String = "" + override def init(): Unit = { logger.info(s"Ready to change engine state!") -// setCodeParser() // todo check + // setCodeParser() // todo check super.init() } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index 5bf90c6bfe..bc18e2badf 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -39,8 +39,10 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ } import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.server.JMap import org.apache.commons.lang3.StringUtils @@ -144,19 +146,32 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val master = sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue) logger.info(s"------ Create new SparkContext {$master} -------") - val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue - val pysparkPath = new File(pysparkBasePath, "python" + File.separator + "lib") - var pythonLibUris = pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip")) - if (pythonLibUris.length == 2) { - val sparkConfValue1 = Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue) - val sparkConfValue2 = Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files")) - if (StringUtils.isNotBlank(sparkConfValue2)) { - pythonLibUris = sparkConfValue2 +: pythonLibUris - } - if (StringUtils.isNotBlank(sparkConfValue1)) { - pythonLibUris = sparkConfValue1 +: pythonLibUris + + val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels()) + val isYarnClusterMode: Boolean = + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + + if (isYarnClusterMode) { + sparkConf.set("spark.submit.deployMode", "cluster") + } + + // todo yarn cluster暂时不支持pyspark,后期对pyspark进行处理 + if (!isYarnClusterMode) { + val pysparkBasePath = SparkConfiguration.SPARK_HOME.getValue + val pysparkPath = new File(pysparkBasePath, "python" + File.separator + "lib") + var pythonLibUris = pysparkPath.listFiles().map(_.toURI.toString).filter(_.endsWith(".zip")) + if (pythonLibUris.length == 2) { + val sparkConfValue1 = Utils.tryQuietly(CommonVars("spark.yarn.dist.files", "").getValue) + val sparkConfValue2 = Utils.tryQuietly(sparkConf.get("spark.yarn.dist.files")) + if (StringUtils.isNotBlank(sparkConfValue2)) { + pythonLibUris = sparkConfValue2 +: pythonLibUris + } + if (StringUtils.isNotBlank(sparkConfValue1)) { + pythonLibUris = sparkConfValue1 +: pythonLibUris + } + sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(",")) } - sparkConf.set("spark.yarn.dist.files", pythonLibUris.mkString(",")) } // Distributes needed libraries to workers // when spark version is greater than or equal to 1.5.0 diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 8472cdfa91..2487ede907 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -18,23 +18,30 @@ package org.apache.linkis.engineplugin.spark.launch import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.engineplugin.spark.config.SparkConfiguration import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ ENGINE_JAR, SPARK_APP_NAME, SPARK_DEFAULT_EXTERNAL_JARS_PATH, + SPARK_DEPLOY_MODE, SPARK_DRIVER_CLASSPATH, SPARK_DRIVER_EXTRA_JAVA_OPTIONS, SPARK_PYTHON_VERSION, - SPARK_SUBMIT_PATH + SPARK_SUBMIT_PATH, + SPARK_YARN_CLUSTER_JARS } import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._ +import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary +import org.apache.linkis.engineplugin.spark.exception.SparkEngineException import org.apache.linkis.hadoop.common.conf.HadoopConf import org.apache.linkis.manager.common.entity.resource.DriverAndYarnResource import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest import org.apache.linkis.manager.engineplugin.common.launch.process.Environment._ import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.protocol.UserWithCreator import org.apache.commons.lang3.StringUtils @@ -61,7 +68,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY) val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES) - val files = getValueAndRemove(properties, "files", "").split(",").filter(isNotBlankPath) + val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "") + .split(",") + .filter(isNotBlankPath) + .toBuffer + .asInstanceOf[ArrayBuffer[String]] val jars = new ArrayBuffer[String]() jars ++= getValueAndRemove(properties, "jars", "").split(",").filter(isNotBlankPath) jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH) @@ -115,8 +126,34 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa memory } + var deployMode: String = SparkConfiguration.SPARK_YARN_CLIENT + + val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineConnBuildRequest.labels) + val isYarnClusterMode: Boolean = + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + + if (isYarnClusterMode) { + deployMode = SparkConfiguration.SPARK_YARN_CLUSTER + files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties") + + var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS) + + if (StringUtils.isBlank(clusterJars)) { + throw new SparkEngineException( + SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorCode, + SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorDesc + ) + } + + if (clusterJars.endsWith("/")) { + clusterJars = clusterJars.dropRight(1) + } + jars += s"$clusterJars/*" + } + addOpt("--master", "yarn") - addOpt("--deploy-mode", "client") + addOpt("--deploy-mode", deployMode) addOpt("--name", appName) addProxyUser() @@ -137,8 +174,9 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa addOpt("--num-executors", numExecutors.toString) addOpt("--queue", queue) - getConf(engineConnBuildRequest, gcLogDir, logDir).foreach { case (key, value) => - addOpt("--conf", s"""$key="$value"""") + getConf(engineConnBuildRequest, gcLogDir, logDir, isYarnClusterMode).foreach { + case (key, value) => + addOpt("--conf", s"""$key="$value"""") } addOpt("--class", className) @@ -152,7 +190,8 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa def getConf( engineConnBuildRequest: EngineConnBuildRequest, gcLogDir: String, - logDir: String + logDir: String, + isYarnClusterMode: Boolean ): ArrayBuffer[(String, String)] = { val driverJavaSet = new StringBuilder(" -server") if (StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue)) { @@ -168,7 +207,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa .foreach(l => { driverJavaSet.append(" ").append(l) }) - driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS)) + if (isYarnClusterMode) { + driverJavaSet.append(" -Djava.io.tmpdir=/tmp") + } else { + driverJavaSet.append(" -Djava.io.tmpdir=" + variable(TEMP_DIRS)) + } if (EnvConfiguration.ENGINE_CONN_DEBUG_ENABLE.getValue) { driverJavaSet.append( s" -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${variable(RANDOM_PORT)}" @@ -186,6 +229,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val keyValue = iterator.next() if ( !SPARK_PYTHON_VERSION.key.equals(keyValue.getKey) && + !SPARK_DEPLOY_MODE.key.equals(keyValue.getKey) && keyValue.getKey.startsWith("spark.") && StringUtils.isNotBlank(keyValue.getValue) ) {