Skip to content

Commit

Permalink
spark support yarn cluster (#4850)
Browse files Browse the repository at this point in the history
* spark support yarn cluster

* spark support yarn cluster

* spark support yarn cluster

* spark support yarn cluster

* spark support yarn cluster

* LinkisManagerApplication Remove useless code

* spark support yarn cluster

* spark support yarn cluster

* spark support yarn cluster

* spark support yarn cluster

* spark support yarn cluster

* spark support yarn cluster
  • Loading branch information
ChengJie1053 authored Aug 14, 2023
1 parent abbcb73 commit a94c876
Show file tree
Hide file tree
Showing 25 changed files with 350 additions and 46 deletions.
3 changes: 3 additions & 0 deletions docs/configuration/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

| Module Name (Service Name) | Parameter Name | Default Value | Description |Used|
| -------- | -------- | ----- |----- | ----- |
|spark|linkis.spark.yarn.cluster.jars|hdfs:///spark/cluster|spark.yarn.cluster.jars|
|spark|linkis.spark.etl.support.hudi|false|spark.etl.support.hudi|
|spark|linkis.bgservice.store.prefix|hdfs:///tmp/bdp-ide/|bgservice.store.prefix|
|spark|linkis.bgservice.store.suffix| |bgservice.store.suffix|
Expand All @@ -27,6 +28,8 @@
|spark|wds.linkis.spark.engineconn.fatal.log|error writing class;OutOfMemoryError|spark.engineconn.fatal.log|
|spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable|

Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster')
spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*''

The spark-excel package may cause class conflicts,need to download separately,put it in spark lib
wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest
killYarnAppIdOfOneEc(engineStopRequest);
}

if (AMConstant.CLUSTER_PROCESS_MARK.equals(engineStopRequest.getIdentifierType())
&& engineStopRequest.getIdentifier() != null) {
List<String> appIds = new ArrayList<>();
appIds.add(engineStopRequest.getIdentifier());
GovernanceUtils.killYarnJobApp(appIds);
}

if (!response.getStopStatus()) {
EngineSuicideRequest request =
new EngineSuicideRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.linkis.manager.common.protocol.engine.{
EngineStopRequest
}
import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender

Expand Down Expand Up @@ -146,11 +147,21 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
throw t
}
LoggerUtils.removeJobIdMDC()

val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels)
val isYarnClusterMode: Boolean =
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false

val engineNode = new AMEngineNode()
engineNode.setLabels(conn.getLabels)
engineNode.setServiceInstance(conn.getServiceInstance)
engineNode.setOwner(request.user)
engineNode.setMark(AMConstant.PROCESS_MARK)
if (isYarnClusterMode) {
engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK)
} else {
engineNode.setMark(AMConstant.PROCESS_MARK)
}
engineNode
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.callback.service.{
EngineConnAfterStartCallback,
EngineConnPidCallback
EngineConnIdentifierCallback
}
import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
Expand Down Expand Up @@ -61,8 +61,8 @@ class CallbackEngineConnHook extends EngineConnHook with Logging {
newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue)
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap))

val engineConnPidCallBack = new EngineConnPidCallback()
Utils.tryAndError(engineConnPidCallBack.callback())
val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
Utils.tryAndError(engineConnIdentifierCallback.callback())
logger.info("<--------------------SpringBoot App init succeed-------------------->")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
package org.apache.linkis.engineconn.callback.service

import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.executor.entity.YarnExecutor
import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid
import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender

import java.lang.management.ManagementFactory

class EngineConnPidCallback extends AbstractEngineConnStartUpCallback {
class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback {

override def callback(): Unit = {
val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
var identifier = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
val instance = Sender.getThisServiceInstance
val context = EngineConnObject.getEngineCreationContext
callback(ResponseEngineConnPid(instance, pid, context.getTicketId))

val label = LabelUtil.getEngingeConnRuntimeModeLabel(context.getLabels())
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) {
identifier = ExecutorManager.getInstance.getReportExecutor match {
case cluster: YarnExecutor => cluster.getApplicationId
}
}
callback(ResponseEngineConnPid(instance, identifier, context.getTicketId))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) {
return dbEngineNode;
}

@Override
public EngineNode getEngineNodeInfoByTicketId(String ticketId) {
EngineNode dbEngineNode = nodeManagerPersistence.getEngineNodeByTicketId(ticketId);
if (null == dbEngineNode) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db");
}
metricsConverter.fillMetricsToNode(
dbEngineNode, nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode));
return dbEngineNode;
}

@Override
public void updateEngineStatus(
ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface EngineNodeManager {

EngineNode getEngineNodeInfoByDB(EngineNode engineNode);

EngineNode getEngineNodeInfoByTicketId(String ticketId);

/**
* Get detailed engine information from the persistence
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,20 @@ private List<Label<?>> fromEMGetEngineLabels(List<Label<?>> emLabels) {
}

private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) {
EngineNode engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
EngineNode engineNodeInfo;
if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId);
} else {
engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
}
if (null == engineNodeInfo) {
return false;
}

if (engineNodeInfo.getServiceInstance() != null) {
engineNode.setServiceInstance(engineNodeInfo.getServiceInstance());
}

if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) {
NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo);
Pair<String, Optional<Boolean>> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.linkis.manager.am.service.impl;

import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid;
import org.apache.linkis.manager.am.manager.DefaultEngineNodeManager;
import org.apache.linkis.manager.am.service.EngineConnPidCallbackService;
import org.apache.linkis.manager.am.service.engine.AbstractEngineService;
import org.apache.linkis.manager.common.constant.AMConstant;
import org.apache.linkis.manager.common.entity.node.EngineNode;
import org.apache.linkis.manager.label.service.NodeLabelService;
import org.apache.linkis.rpc.message.annotation.Receiver;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -30,12 +34,15 @@
import org.slf4j.LoggerFactory;

@Service
public class DefaultEngineConnPidCallbackService implements EngineConnPidCallbackService {
public class DefaultEngineConnPidCallbackService extends AbstractEngineService
implements EngineConnPidCallbackService {
private static final Logger logger =
LoggerFactory.getLogger(DefaultEngineConnPidCallbackService.class);

@Autowired private DefaultEngineNodeManager defaultEngineNodeManager;

@Autowired private NodeLabelService nodeLabelService;

@Receiver
@Override
public void dealPid(ResponseEngineConnPid protocol) {
Expand All @@ -56,6 +63,14 @@ public void dealPid(ResponseEngineConnPid protocol) {
}

engineNode.setIdentifier(protocol.pid());

if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
ServiceInstance serviceInstance = protocol.serviceInstance();
engineNode.setServiceInstance(serviceInstance);
getEngineNodeManager().updateEngineNode(serviceInstance, engineNode);
nodeLabelService.labelsFromInstanceToNewInstance(
engineNode.getServiceInstance(), serviceInstance);
}
defaultEngineNodeManager.updateEngine(engineNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public interface NodeLabelService {

void updateLabelsToNode(ServiceInstance instance, List<Label<?>> labels);

void labelsFromInstanceToNewInstance(
ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance);

/**
* Remove the labels related by node instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,44 @@ public void updateLabelsToNode(ServiceInstance instance, List<Label<?>> labels)
}
}

@Override
public void labelsFromInstanceToNewInstance(
ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) {
List<PersistenceLabel> labels =
labelManagerPersistence.getLabelByServiceInstance(newServiceInstance);
List<String> newKeyList = labels.stream().map(Label::getLabelKey).collect(Collectors.toList());
List<PersistenceLabel> nodeLabels =
labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance);

List<String> oldKeyList =
nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList());

List<String> willBeAdd = new ArrayList<>(oldKeyList);
willBeAdd.removeAll(newKeyList);

// Assign the old association to the newServiceInstance
if (!CollectionUtils.isEmpty(willBeAdd)) {
nodeLabels.forEach(
nodeLabel -> {
if (willBeAdd.contains(nodeLabel.getLabelKey())) {
PersistenceLabel persistenceLabel =
LabelManagerUtils.convertPersistenceLabel(nodeLabel);
int labelId = tryToAddLabel(persistenceLabel);
if (labelId > 0) {
List<Integer> labelIds = new ArrayList<>();
labelIds.add(labelId);
labelManagerPersistence.addLabelToNode(newServiceInstance, labelIds);
}
}
});
}

// Delete an old association
List<Integer> oldLabelId =
nodeLabels.stream().map(PersistenceLabel::getId).collect(Collectors.toList());
labelManagerPersistence.removeNodeLabels(oldServiceInstance, oldLabelId);
}

/**
* Remove the labels related by node instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,7 @@ public class LabelKeyConstant {

public static final String FIXED_EC_KEY = "fixedEngineConn";

public static final String ENGINGE_CONN_RUNTIME_MODE_KEY = "engingeConnRuntimeMode";

public static final String MANAGER_KEY = "manager";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
public class LabelValueConstant {

public static final String OFFLINE_VALUE = "offline";

public static final String YARN_CLUSTER_VALUE = "yarnCluster";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.label.entity.engine;

import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.entity.*;
import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
import org.apache.linkis.manager.label.exception.LabelErrorException;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;

import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY;
import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE;

public class EngingeConnRuntimeModeLabel extends GenericLabel
implements EngineNodeLabel, UserModifiable {

public EngingeConnRuntimeModeLabel() {
setLabelKey(LabelKeyConstant.ENGINGE_CONN_RUNTIME_MODE_KEY);
}

@ValueSerialNum(0)
public void setModeValue(String modeValue) {
if (getValue() == null) {
setValue(new HashMap<>());
}
getValue().put("modeValue", modeValue);
}

public String getModeValue() {
if (getValue() == null) {
return null;
}
return getValue().get("modeValue");
}

@Override
public Feature getFeature() {
return Feature.CORE;
}

@Override
public void valueCheck(String stringValue) throws LabelErrorException {
if (!StringUtils.isBlank(stringValue)) {
if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) {
throw new LabelErrorException(
LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc());
}
} else {
throw new LabelErrorException(
CHECK_LABEL_VALUE_EMPTY.getErrorCode(), CHECK_LABEL_VALUE_EMPTY.getErrorDesc());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
EngineConnModeLabel,
EngineTypeLabel,
EngingeConnRuntimeModeLabel,
UserCreatorLabel
}
import org.apache.linkis.manager.label.entity.entrance.{
Expand Down Expand Up @@ -80,6 +81,10 @@ object LabelUtil {
getLabelFromList[CodeLanguageLabel](labels)
}

def getEngingeConnRuntimeModeLabel(labels: util.List[Label[_]]): EngingeConnRuntimeModeLabel = {
getLabelFromList[EngingeConnRuntimeModeLabel](labels)
}

def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel = {
getLabelFromList[EngineConnModeLabel](labels)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class AMConstant {

public static final String PROCESS_MARK = "process";

public static final String CLUSTER_PROCESS_MARK = "cluster_process";

public static final String THREAD_MARK = "thread";

public static final String START_REASON = "start_reason";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void updateNodeInstance(

PersistenceNode getNodeInstance(@Param("instance") String instance);

PersistenceNode getNodeInstanceByTicketId(@Param("ticketId") String ticketId);

PersistenceNode getNodeInstanceById(@Param("id") int id);

PersistenceNode getEMNodeInstanceByEngineNode(@Param("instance") String instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void updateEngineNode(ServiceInstance serviceInstance, Node node)
*/
EngineNode getEngineNode(ServiceInstance serviceInstance);

EngineNode getEngineNodeByTicketId(String ticketId);

/**
* 通过Em的ServiceInstance 获取EM下面Engine的列表
*
Expand Down
Loading

0 comments on commit a94c876

Please sign in to comment.