Skip to content

Commit

Permalink
Merge branch 'master' into dev-hbase
Browse files Browse the repository at this point in the history
  • Loading branch information
CCweixiao authored Sep 26, 2023
2 parents 745699c + c0cebe4 commit e406f3a
Show file tree
Hide file tree
Showing 57 changed files with 1,973 additions and 139 deletions.
5 changes: 4 additions & 1 deletion docs/configuration/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
|spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable|

Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster')
spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*''
spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*'

Precautions for using yarnCluster:
Eureka url if 127.0.0.1 should be changed to the real host, such as "127.0.0.1:20303/eureka/" should be changed to "wds001:20303/eureka/"

The spark-excel package may cause class conflicts,need to download separately,put it in spark lib
wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.commons.lang3.exception.ExceptionUtils;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,7 +117,7 @@ public JobResult run() {
// async job, return
if (isAsync) {
return new InteractiveJobResult(
submitResult.getJobStatus().isJobSubmitted(),
jobInfoResult.getJobStatus().isJobSubmitted(),
"Async Submission Success",
new HashMap<>());
}
Expand Down Expand Up @@ -171,7 +172,14 @@ private JobResult getResult(
"Job status is not success but \'"
+ jobInfoResult.getJobStatus()
+ "\'. Will not try to retrieve any Result");
return new InteractiveJobResult(false, "Execute Error!!!", new HashMap<>());
Map<String, String> extraMap = new HashMap<>();
if (jobInfoResult.getErrCode() != null) {
extraMap.put("errorCode", String.valueOf(jobInfoResult.getErrCode()));
}
if (StringUtils.isNotBlank(jobInfoResult.getErrDesc())) {
extraMap.put("errorDesc", jobInfoResult.getErrDesc());
}
return new InteractiveJobResult(false, "Execute Error!!!", extraMap);
}
InteractiveJobResult result =
new InteractiveJobResult(true, "Execute Success!!!", new HashMap<>());
Expand Down Expand Up @@ -246,6 +254,9 @@ public void onDestroy() {
logger.warn("Failed to kill job username or jobId is blank");
return;
}
if (isAsync) {
return;
}
try {
new JobKiller(oper).doKill(username, jobId);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ class CallbackEngineConnHook extends EngineConnHook with Logging {
newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue)
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap))

val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
Utils.tryAndError(engineConnIdentifierCallback.callback())
logger.info("<--------------------SpringBoot App init succeed-------------------->")
}

override def beforeExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {}
): Unit = {
val engineConnIdentifierCallback = new EngineConnIdentifierCallback()
Utils.tryAndError(engineConnIdentifierCallback.callback())
}

override def afterExecutionExecute(
engineCreationContext: EngineCreationContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope
case EngineType.HIVE => RunType.HIVE
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
case EngineType.NEBULA => RunType.NEBULA_SQL
case EngineType.ELASTICSEARCH => RunType.ES_SQL
case EngineType.JDBC => RunType.JDBC
case EngineType.PYTHON => RunType.SHELL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,11 @@ public Message executeECMOperation(HttpServletRequest req, @RequestBody JsonNode
"Fail to process the operation parameters, cased by "
+ ExceptionUtils.getRootCauseMessage(e));
}
String engineConnInstance = (String) parameters.get("engineConnInstance");

return executeECMOperation(
ecmNode, engineConnInstance, new ECMOperateRequest(userName, parameters));
ecmNode,
parameters.getOrDefault("engineConnInstance", "").toString(),
new ECMOperateRequest(userName, parameters));
}

@ApiOperation(value = "openEngineLog", notes = "open Engine log", response = Message.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ private Comparator<Node> sortByResource() {
RMNode nodeBRm = (RMNode) nodeB;
if (nodeARm.getNodeResource() == null
|| nodeARm.getNodeResource().getLeftResource() == null) {
return -1;
return 1;
} else if (nodeBRm.getNodeResource() == null
|| nodeBRm.getNodeResource().getLeftResource() == null) {
return 1;
return -1;
} else {
if (nodeARm
.getNodeResource()
Expand All @@ -67,9 +67,13 @@ private Comparator<Node> sortByResource() {
.getNodeResource()
.getLeftResource()
.moreThan(nodeBRm.getNodeResource().getLeftResource())) {
return 1;
} else {
return -1;
} else {
// 从大到小排序 (Sort from large to small)
return -(nodeARm
.getNodeResource()
.getLeftResource()
.compare(nodeBRm.getNodeResource().getLeftResource()));
}
}
} catch (Throwable t) {
Expand All @@ -93,10 +97,10 @@ private Comparator<Node> sortByResourceRate() {
RMNode nodeBRm = (RMNode) nodeB;
if (nodeARm.getNodeResource() == null
|| nodeARm.getNodeResource().getLeftResource() == null) {
return -1;
return 1;
} else if (nodeBRm.getNodeResource() == null
|| nodeBRm.getNodeResource().getLeftResource() == null) {
return 1;
return -1;
} else {
float aRate =
ResourceUtils.getLoadInstanceResourceRate(
Expand All @@ -106,7 +110,7 @@ private Comparator<Node> sortByResourceRate() {
ResourceUtils.getLoadInstanceResourceRate(
nodeBRm.getNodeResource().getLeftResource(),
nodeBRm.getNodeResource().getMaxResource());
return Float.compare(aRate, bRate);
return -Float.compare(aRate, bRate);
}
} catch (Throwable t) {
logger.warn("Failed to Compare resource " + t.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ private Comparator<Node> sortByScore() {
ScoreServiceInstance instanceB = (ScoreServiceInstance) nodeB;
try {
if (instanceA.getScore() > instanceB.getScore()) {
return 1;
return -1;
}
} catch (Exception e) {
logger.warn("Failed to Compare resource ", e);
return -1;
}
return -1;
return 1;
} else {
return -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void dealPid(ResponseEngineConnPid protocol) {
protocol.pid(),
protocol.ticketId());

EngineNode engineNode = defaultEngineNodeManager.getEngineNode(protocol.serviceInstance());
EngineNode engineNode =
defaultEngineNodeManager.getEngineNodeInfoByTicketId(protocol.ticketId());
if (engineNode == null) {
logger.error(
"DefaultEngineConnPidCallbackService dealPid failed, engineNode is null, serviceInstance:{}",
Expand All @@ -63,13 +64,12 @@ public void dealPid(ResponseEngineConnPid protocol) {
}

engineNode.setIdentifier(protocol.pid());

ServiceInstance oldServiceInstance = engineNode.getServiceInstance();
if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
ServiceInstance serviceInstance = protocol.serviceInstance();
engineNode.setServiceInstance(serviceInstance);
getEngineNodeManager().updateEngineNode(serviceInstance, engineNode);
nodeLabelService.labelsFromInstanceToNewInstance(
engineNode.getServiceInstance(), serviceInstance);
getEngineNodeManager().updateEngineNode(oldServiceInstance, engineNode);
nodeLabelService.labelsFromInstanceToNewInstance(oldServiceInstance, serviceInstance);
}
defaultEngineNodeManager.updateEngine(engineNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ public class LabelManagerConf {

public static final String LONG_LIVED_LABEL =
CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue();

public static final boolean COMBINED_WITHOUT_YARN_DEFAULT =
CommonVars.apply("linkis.combined.without.yarn.default", true).getValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.linkis.manager.rm.domain;

import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
import org.apache.linkis.manager.label.conf.LabelManagerConf;
import org.apache.linkis.manager.label.entity.CombinedLabel;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.ResourceLabel;
import org.apache.linkis.manager.label.entity.cluster.ClusterLabel;
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
Expand Down Expand Up @@ -49,22 +52,25 @@ public class RMLabelContainer {
private EngineTypeLabel engineTypeLabel;
private UserCreatorLabel userCreatorLabel;
private EngineInstanceLabel engineInstanceLabel;
private CombinedLabel combinedUserCreatorEngineTypeLabel;
private ClusterLabel clusterLabel;
private CombinedLabel combinedResourceLabel;
private Label currentLabel;

public RMLabelContainer(List<Label<?>> labels) {
this.labels = labels;
this.lockedLabels = Lists.newArrayList();
try {
if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) {
this.combinedUserCreatorEngineTypeLabel =
(CombinedLabel)
combinedLabelBuilder.build(
"", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel()));
this.labels.add(combinedUserCreatorEngineTypeLabel);
List<Label> combinedLabel = Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel());
ClusterLabel clusterLabel = getClusterLabel();
if (shouldCombinedClusterLabel(clusterLabel)) {
combinedLabel.add(clusterLabel);
}
this.combinedResourceLabel = (CombinedLabel) combinedLabelBuilder.build("", combinedLabel);
this.labels.add(combinedResourceLabel);
}
} catch (Exception e) {
logger.warn("failed to get combinedUserCreatorEngineTypeLabel", e);
logger.warn("failed to get combinedResourceLabel", e);
}
this.labels = LabelUtils.distinctLabel(this.labels, labels);
}
Expand Down Expand Up @@ -156,8 +162,31 @@ public EngineInstanceLabel getEngineInstanceLabel() {
return null;
}

public CombinedLabel getCombinedUserCreatorEngineTypeLabel() {
return combinedUserCreatorEngineTypeLabel;
public ClusterLabel getClusterLabel() {
if (clusterLabel == null) {
for (Label label : labels) {
if (label instanceof ClusterLabel) {
return (ClusterLabel) label;
}
}
} else {
return clusterLabel;
}
logger.warn("ClusterLabel not found");
return null;
}

private boolean shouldCombinedClusterLabel(ClusterLabel clusterLabel) {
return !(clusterLabel == null
|| (LabelManagerConf.COMBINED_WITHOUT_YARN_DEFAULT
&& clusterLabel
.getClusterName()
.equals(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue())
&& clusterLabel.getClusterType().equals(RMConfiguration.DEFAULT_YARN_TYPE.getValue())));
}

public CombinedLabel getCombinedResourceLabel() {
return combinedResourceLabel;
}

public Label getCurrentLabel() {
Expand Down Expand Up @@ -195,6 +224,8 @@ public String toString() {
+ userCreatorLabel
+ ", engineInstanceLabel="
+ engineInstanceLabel
+ ", clusterLabel="
+ clusterLabel
+ ", currentLabel="
+ currentLabel
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceQuota;
import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +50,7 @@ public class KubernetesResourceRequester implements ExternalResourceRequester {
@Override
public NodeResource requestResourceInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
String k8sMasterUrl = getK8sMasterUrl(provider);
DefaultKubernetesClient client = clientMap.get(k8sMasterUrl);
if (client == null) {
constructKubernetesClient(provider);
Expand Down Expand Up @@ -154,8 +159,7 @@ public ResourceType getResourceType() {
@Override
public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider) {
if (null != provider) {
DefaultKubernetesClient client =
clientMap.get((String) provider.getConfigMap().get("k8sMasterUrl"));
DefaultKubernetesClient client = clientMap.get(getK8sMasterUrl(provider));
if (client != null) {
client.close();
}
Expand All @@ -164,19 +168,42 @@ public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider)
return true;
}

private String getK8sMasterUrl(ExternalResourceProvider provider) {
Map<String, Object> configMap = provider.getConfigMap();
String k8sMasterUrl = (String) configMap.get("k8sMasterUrl");
if (StringUtils.isBlank(k8sMasterUrl)) {
throw new IllegalArgumentException("k8sMasterUrl is empty, please check the configuration.");
}
return k8sMasterUrl;
}

private void constructKubernetesClient(ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
String k8sClientCertData = (String) provider.getConfigMap().get("k8sClientCertData");
String k8sClientKeyData = (String) provider.getConfigMap().get("k8sClientKeyData");
String k8sCaCertData = (String) provider.getConfigMap().get("k8sCaCertData");
DefaultKubernetesClient client =
new DefaultKubernetesClient(
new ConfigBuilder()
.withMasterUrl(k8sMasterUrl)
.withClientCertData(k8sClientCertData)
.withClientKeyData(k8sClientKeyData)
.withCaCertData(k8sCaCertData)
.build());
DefaultKubernetesClient client;
Map<String, Object> configMap = provider.getConfigMap();
String k8sMasterUrl = getK8sMasterUrl(provider);
try {
String k8sConfig = (String) configMap.get("k8sConfig");
if (StringUtils.isNotBlank(k8sConfig)) {
Config kubeConfig =
Config.fromKubeconfig(
null, FileUtils.readFileToString(new File(k8sConfig), "UTF-8"), null);
client = new DefaultKubernetesClient(kubeConfig);
} else {
String k8sClientCertData = (String) configMap.get("k8sClientCertData");
String k8sClientKeyData = (String) configMap.get("k8sClientKeyData");
String k8sCaCertData = (String) configMap.get("k8sCaCertData");
client =
new DefaultKubernetesClient(
new ConfigBuilder()
.withMasterUrl(k8sMasterUrl)
.withClientCertData(k8sClientCertData)
.withClientKeyData(k8sClientKeyData)
.withCaCertData(k8sCaCertData)
.build());
}
} catch (Exception e) {
throw new KubernetesClientException("Fail to build k8s client. ", e);
}
clientMap.put(k8sMasterUrl, client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public void dealWithResourceUsedProtocol(ResourceUsedProtocol resourceUsedProtoc
} catch (Exception e) {
RMLabelContainer nodeLabels = new RMLabelContainer(labels);
String value =
Optional.of(nodeLabels.getCombinedUserCreatorEngineTypeLabel())
.map(Object::toString)
.orElse("");
Optional.of(nodeLabels.getCombinedResourceLabel()).map(Object::toString).orElse("");
logger.warn(
String.format(
"usedResource failed, request from:%s, request engine: %s, ",
Expand Down
Loading

0 comments on commit e406f3a

Please sign in to comment.