Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark support yarn cluster #4850

Merged
merged 13 commits into from
Aug 14, 2023
3 changes: 3 additions & 0 deletions docs/configuration/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

| Module Name (Service Name) | Parameter Name | Default Value | Description |Used|
| -------- | -------- | ----- |----- | ----- |
|spark|linkis.spark.yarn.cluster.jars|hdfs:///spark/cluster|spark.yarn.cluster.jars|
|spark|linkis.spark.etl.support.hudi|false|spark.etl.support.hudi|
|spark|linkis.bgservice.store.prefix|hdfs:///tmp/bdp-ide/|bgservice.store.prefix|
|spark|linkis.bgservice.store.suffix| |bgservice.store.suffix|
Expand All @@ -27,6 +28,8 @@
|spark|wds.linkis.spark.engineconn.fatal.log|error writing class;OutOfMemoryError|spark.engineconn.fatal.log|
|spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable|

Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster')
spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*''

The spark-excel package may cause class conflicts,need to download separately,put it in spark lib
wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest
killYarnAppIdOfOneEc(engineStopRequest);
}

if (AMConstant.CLUSTER_PROCESS_MARK.equals(engineStopRequest.getIdentifierType())
&& engineStopRequest.getIdentifier() != null) {
List<String> appIds = new ArrayList<>();
appIds.add(engineStopRequest.getIdentifier());
GovernanceUtils.killYarnJobApp(appIds);
}

if (!response.getStopStatus()) {
EngineSuicideRequest request =
new EngineSuicideRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.linkis.manager.common.protocol.engine.{
EngineStopRequest
}
import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender

Expand Down Expand Up @@ -146,11 +147,21 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
throw t
}
LoggerUtils.removeJobIdMDC()

val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels)
val isYarnClusterMode: Boolean =
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false

val engineNode = new AMEngineNode()
engineNode.setLabels(conn.getLabels)
engineNode.setServiceInstance(conn.getServiceInstance)
engineNode.setOwner(request.user)
engineNode.setMark(AMConstant.PROCESS_MARK)
if (isYarnClusterMode) {
engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK)
} else {
engineNode.setMark(AMConstant.PROCESS_MARK)
}
engineNode
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.callback.service.{
EngineConnAfterStartCallback,
EngineConnPidCallback
EngineConnIdentifierCallback
}
import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
Expand Down Expand Up @@ -61,8 +61,8 @@ class CallbackEngineConnHook extends EngineConnHook with Logging {
newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue)
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap))

val engineConnPidCallBack = new EngineConnPidCallback()
Utils.tryAndError(engineConnPidCallBack.callback())
val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
Utils.tryAndError(engineConnIdentifierCallback.callback())
logger.info("<--------------------SpringBoot App init succeed-------------------->")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
package org.apache.linkis.engineconn.callback.service

import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.executor.entity.YarnExecutor
import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid
import org.apache.linkis.manager.label.constant.LabelValueConstant
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender

import java.lang.management.ManagementFactory

class EngineConnPidCallback extends AbstractEngineConnStartUpCallback {
class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback {

override def callback(): Unit = {
val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
var identifier = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
val instance = Sender.getThisServiceInstance
val context = EngineConnObject.getEngineCreationContext
callback(ResponseEngineConnPid(instance, pid, context.getTicketId))

val label = LabelUtil.getEngingeConnRuntimeModeLabel(context.getLabels())
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) {
identifier = ExecutorManager.getInstance.getReportExecutor match {
case cluster: YarnExecutor => cluster.getApplicationId
}
}
callback(ResponseEngineConnPid(instance, identifier, context.getTicketId))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) {
return dbEngineNode;
}

@Override
public EngineNode getEngineNodeInfoByTicketId(String ticketId) {
EngineNode dbEngineNode = nodeManagerPersistence.getEngineNodeByTicketId(ticketId);
if (null == dbEngineNode) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db");
}
metricsConverter.fillMetricsToNode(
dbEngineNode, nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode));
return dbEngineNode;
}

@Override
public void updateEngineStatus(
ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface EngineNodeManager {

EngineNode getEngineNodeInfoByDB(EngineNode engineNode);

EngineNode getEngineNodeInfoByTicketId(String ticketId);

/**
* Get detailed engine information from the persistence
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,20 @@ private List<Label<?>> fromEMGetEngineLabels(List<Label<?>> emLabels) {
}

private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) {
EngineNode engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
EngineNode engineNodeInfo;
if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId);
} else {
engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
}
if (null == engineNodeInfo) {
return false;
}

if (engineNodeInfo.getServiceInstance() != null) {
engineNode.setServiceInstance(engineNodeInfo.getServiceInstance());
}

if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) {
NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo);
Pair<String, Optional<Boolean>> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.linkis.manager.am.service.impl;

import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid;
import org.apache.linkis.manager.am.manager.DefaultEngineNodeManager;
import org.apache.linkis.manager.am.service.EngineConnPidCallbackService;
import org.apache.linkis.manager.am.service.engine.AbstractEngineService;
import org.apache.linkis.manager.common.constant.AMConstant;
import org.apache.linkis.manager.common.entity.node.EngineNode;
import org.apache.linkis.manager.label.service.NodeLabelService;
import org.apache.linkis.rpc.message.annotation.Receiver;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -30,12 +34,15 @@
import org.slf4j.LoggerFactory;

@Service
public class DefaultEngineConnPidCallbackService implements EngineConnPidCallbackService {
public class DefaultEngineConnPidCallbackService extends AbstractEngineService
implements EngineConnPidCallbackService {
private static final Logger logger =
LoggerFactory.getLogger(DefaultEngineConnPidCallbackService.class);

@Autowired private DefaultEngineNodeManager defaultEngineNodeManager;

@Autowired private NodeLabelService nodeLabelService;

@Receiver
@Override
public void dealPid(ResponseEngineConnPid protocol) {
Expand All @@ -56,6 +63,14 @@ public void dealPid(ResponseEngineConnPid protocol) {
}

engineNode.setIdentifier(protocol.pid());

if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
ServiceInstance serviceInstance = protocol.serviceInstance();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should first org.apache.linkis.manager.am.manager.EngineNodeManager#updateEngineNode and then update label

engineNode.setServiceInstance(serviceInstance);
getEngineNodeManager().updateEngineNode(serviceInstance, engineNode);
nodeLabelService.labelsFromInstanceToNewInstance(
engineNode.getServiceInstance(), serviceInstance);
}
defaultEngineNodeManager.updateEngine(engineNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public interface NodeLabelService {

void updateLabelsToNode(ServiceInstance instance, List<Label<?>> labels);

void labelsFromInstanceToNewInstance(
ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance);

/**
* Remove the labels related by node instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,44 @@ public void updateLabelsToNode(ServiceInstance instance, List<Label<?>> labels)
}
}

@Override
public void labelsFromInstanceToNewInstance(
ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) {
List<PersistenceLabel> labels =
labelManagerPersistence.getLabelByServiceInstance(newServiceInstance);
List<String> newKeyList = labels.stream().map(Label::getLabelKey).collect(Collectors.toList());
List<PersistenceLabel> nodeLabels =
labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance);

List<String> oldKeyList =
nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn’t be so complicated here, we only need to first get the new instance tag, then get the old instance tag, then delete the duplicate tag in the old tag, and then associate the new instance with the old tag just go
这里应该不用搞这么复杂,我们只需要第一步获得新的instance的标签,然后再获取老的instance标签,然后将老的标签中重复的标签删除,接着再将新的instance和老的标签关联上就好了


List<String> willBeAdd = new ArrayList<>(oldKeyList);
willBeAdd.removeAll(newKeyList);

// Assign the old association to the newServiceInstance
if (!CollectionUtils.isEmpty(willBeAdd)) {
nodeLabels.forEach(
nodeLabel -> {
if (willBeAdd.contains(nodeLabel.getLabelKey())) {
PersistenceLabel persistenceLabel =
LabelManagerUtils.convertPersistenceLabel(nodeLabel);
int labelId = tryToAddLabel(persistenceLabel);
if (labelId > 0) {
List<Integer> labelIds = new ArrayList<>();
labelIds.add(labelId);
labelManagerPersistence.addLabelToNode(newServiceInstance, labelIds);
}
}
});
}

// Delete an old association
List<Integer> oldLabelId =
nodeLabels.stream().map(PersistenceLabel::getId).collect(Collectors.toList());
labelManagerPersistence.removeNodeLabels(oldServiceInstance, oldLabelId);
}

/**
* Remove the labels related by node instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,7 @@ public class LabelKeyConstant {

public static final String FIXED_EC_KEY = "fixedEngineConn";

public static final String ENGINGE_CONN_RUNTIME_MODE_KEY = "engingeConnRuntimeMode";

public static final String MANAGER_KEY = "manager";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
public class LabelValueConstant {

public static final String OFFLINE_VALUE = "offline";

public static final String YARN_CLUSTER_VALUE = "yarnCluster";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.manager.label.entity.engine;

import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.entity.*;
import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
import org.apache.linkis.manager.label.exception.LabelErrorException;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;

import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY;
import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE;

public class EngingeConnRuntimeModeLabel extends GenericLabel
implements EngineNodeLabel, UserModifiable {

public EngingeConnRuntimeModeLabel() {
setLabelKey(LabelKeyConstant.ENGINGE_CONN_RUNTIME_MODE_KEY);
}

@ValueSerialNum(0)
public void setModeValue(String modeValue) {
if (getValue() == null) {
setValue(new HashMap<>());
}
getValue().put("modeValue", modeValue);
}

public String getModeValue() {
if (getValue() == null) {
return null;
}
return getValue().get("modeValue");
}

@Override
public Feature getFeature() {
return Feature.CORE;
}

@Override
public void valueCheck(String stringValue) throws LabelErrorException {
if (!StringUtils.isBlank(stringValue)) {
if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) {
throw new LabelErrorException(
LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc());
}
} else {
throw new LabelErrorException(
CHECK_LABEL_VALUE_EMPTY.getErrorCode(), CHECK_LABEL_VALUE_EMPTY.getErrorDesc());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
EngineConnModeLabel,
EngineTypeLabel,
EngingeConnRuntimeModeLabel,
UserCreatorLabel
}
import org.apache.linkis.manager.label.entity.entrance.{
Expand Down Expand Up @@ -80,6 +81,10 @@ object LabelUtil {
getLabelFromList[CodeLanguageLabel](labels)
}

def getEngingeConnRuntimeModeLabel(labels: util.List[Label[_]]): EngingeConnRuntimeModeLabel = {
getLabelFromList[EngingeConnRuntimeModeLabel](labels)
}

def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel = {
getLabelFromList[EngineConnModeLabel](labels)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class AMConstant {

public static final String PROCESS_MARK = "process";

public static final String CLUSTER_PROCESS_MARK = "cluster_process";

public static final String THREAD_MARK = "thread";

public static final String START_REASON = "start_reason";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void updateNodeInstance(

PersistenceNode getNodeInstance(@Param("instance") String instance);

PersistenceNode getNodeInstanceByTicketId(@Param("ticketId") String ticketId);

PersistenceNode getNodeInstanceById(@Param("id") int id);

PersistenceNode getEMNodeInstanceByEngineNode(@Param("instance") String instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void updateEngineNode(ServiceInstance serviceInstance, Node node)
*/
EngineNode getEngineNode(ServiceInstance serviceInstance);

EngineNode getEngineNodeByTicketId(String ticketId);

/**
* 通过Em的ServiceInstance 获取EM下面Engine的列表
*
Expand Down
Loading
Loading