Skip to content

Commit

Permalink
Merge branch 'master' of github.com:sjgllgh/linkis into master
Browse files Browse the repository at this point in the history
  • Loading branch information
sjgllgh committed Sep 26, 2023
2 parents ed1d64b + c0cebe4 commit 9b3080d
Show file tree
Hide file tree
Showing 48 changed files with 1,404 additions and 94 deletions.
5 changes: 4 additions & 1 deletion docs/configuration/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
|spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable|

Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster')
spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*''
spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*'

Precautions for using yarnCluster:
Eureka url if 127.0.0.1 should be changed to the real host, such as "127.0.0.1:20303/eureka/" should be changed to "wds001:20303/eureka/"

The spark-excel package may cause class conflicts,need to download separately,put it in spark lib
wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope
case EngineType.HIVE => RunType.HIVE
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
case EngineType.NEBULA => RunType.NEBULA_SQL
case EngineType.ELASTICSEARCH => RunType.ES_SQL
case EngineType.JDBC => RunType.JDBC
case EngineType.PYTHON => RunType.SHELL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public class AMConfiguration {

public static final CommonVars<String> MULTI_USER_ENGINE_TYPES =
CommonVars.apply(
"wds.linkis.multi.user.engine.types", "jdbc,es,presto,io_file,appconn,openlookeng,trino");
"wds.linkis.multi.user.engine.types",
"jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula");

public static final CommonVars<String> ALLOW_BATCH_KILL_ENGINE_TYPES =
CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python");
Expand Down Expand Up @@ -104,8 +105,8 @@ public class AMConfiguration {
public static String getDefaultMultiEngineUser() {
String jvmUser = Utils.getJvmUser();
return String.format(
"{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", io_file:\"root\"}",
jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
"{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\",io_file:\"root\"}",
jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
}

public static boolean isMultiUserEngine(String engineType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ private Comparator<Node> sortByResource() {
RMNode nodeBRm = (RMNode) nodeB;
if (nodeARm.getNodeResource() == null
|| nodeARm.getNodeResource().getLeftResource() == null) {
return -1;
return 1;
} else if (nodeBRm.getNodeResource() == null
|| nodeBRm.getNodeResource().getLeftResource() == null) {
return 1;
return -1;
} else {
if (nodeARm
.getNodeResource()
Expand All @@ -69,7 +69,11 @@ private Comparator<Node> sortByResource() {
.moreThan(nodeBRm.getNodeResource().getLeftResource())) {
return -1;
} else {
return 1;
// 从大到小排序 (Sort from large to small)
return -(nodeARm
.getNodeResource()
.getLeftResource()
.compare(nodeBRm.getNodeResource().getLeftResource()));
}
}
} catch (Throwable t) {
Expand All @@ -93,10 +97,10 @@ private Comparator<Node> sortByResourceRate() {
RMNode nodeBRm = (RMNode) nodeB;
if (nodeARm.getNodeResource() == null
|| nodeARm.getNodeResource().getLeftResource() == null) {
return -1;
return 1;
} else if (nodeBRm.getNodeResource() == null
|| nodeBRm.getNodeResource().getLeftResource() == null) {
return 1;
return -1;
} else {
float aRate =
ResourceUtils.getLoadInstanceResourceRate(
Expand All @@ -106,7 +110,7 @@ private Comparator<Node> sortByResourceRate() {
ResourceUtils.getLoadInstanceResourceRate(
nodeBRm.getNodeResource().getLeftResource(),
nodeBRm.getNodeResource().getMaxResource());
return Float.compare(aRate, bRate);
return -Float.compare(aRate, bRate);
}
} catch (Throwable t) {
logger.warn("Failed to Compare resource " + t.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ public class LabelManagerConf {

public static final String LONG_LIVED_LABEL =
CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue();

public static final boolean COMBINED_WITHOUT_YARN_DEFAULT =
CommonVars.apply("linkis.combined.without.yarn.default", true).getValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.linkis.manager.rm.domain;

import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
import org.apache.linkis.manager.label.conf.LabelManagerConf;
import org.apache.linkis.manager.label.entity.CombinedLabel;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.ResourceLabel;
import org.apache.linkis.manager.label.entity.cluster.ClusterLabel;
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
Expand Down Expand Up @@ -49,22 +52,25 @@ public class RMLabelContainer {
private EngineTypeLabel engineTypeLabel;
private UserCreatorLabel userCreatorLabel;
private EngineInstanceLabel engineInstanceLabel;
private CombinedLabel combinedUserCreatorEngineTypeLabel;
private ClusterLabel clusterLabel;
private CombinedLabel combinedResourceLabel;
private Label currentLabel;

public RMLabelContainer(List<Label<?>> labels) {
this.labels = labels;
this.lockedLabels = Lists.newArrayList();
try {
if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) {
this.combinedUserCreatorEngineTypeLabel =
(CombinedLabel)
combinedLabelBuilder.build(
"", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel()));
this.labels.add(combinedUserCreatorEngineTypeLabel);
List<Label> combinedLabel = Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel());
ClusterLabel clusterLabel = getClusterLabel();
if (shouldCombinedClusterLabel(clusterLabel)) {
combinedLabel.add(clusterLabel);
}
this.combinedResourceLabel = (CombinedLabel) combinedLabelBuilder.build("", combinedLabel);
this.labels.add(combinedResourceLabel);
}
} catch (Exception e) {
logger.warn("failed to get combinedUserCreatorEngineTypeLabel", e);
logger.warn("failed to get combinedResourceLabel", e);
}
this.labels = LabelUtils.distinctLabel(this.labels, labels);
}
Expand Down Expand Up @@ -156,8 +162,31 @@ public EngineInstanceLabel getEngineInstanceLabel() {
return null;
}

public CombinedLabel getCombinedUserCreatorEngineTypeLabel() {
return combinedUserCreatorEngineTypeLabel;
public ClusterLabel getClusterLabel() {
if (clusterLabel == null) {
for (Label label : labels) {
if (label instanceof ClusterLabel) {
return (ClusterLabel) label;
}
}
} else {
return clusterLabel;
}
logger.warn("ClusterLabel not found");
return null;
}

private boolean shouldCombinedClusterLabel(ClusterLabel clusterLabel) {
return !(clusterLabel == null
|| (LabelManagerConf.COMBINED_WITHOUT_YARN_DEFAULT
&& clusterLabel
.getClusterName()
.equals(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue())
&& clusterLabel.getClusterType().equals(RMConfiguration.DEFAULT_YARN_TYPE.getValue())));
}

public CombinedLabel getCombinedResourceLabel() {
return combinedResourceLabel;
}

public Label getCurrentLabel() {
Expand Down Expand Up @@ -195,6 +224,8 @@ public String toString() {
+ userCreatorLabel
+ ", engineInstanceLabel="
+ engineInstanceLabel
+ ", clusterLabel="
+ clusterLabel
+ ", currentLabel="
+ currentLabel
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceQuota;
import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +50,7 @@ public class KubernetesResourceRequester implements ExternalResourceRequester {
@Override
public NodeResource requestResourceInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
String k8sMasterUrl = getK8sMasterUrl(provider);
DefaultKubernetesClient client = clientMap.get(k8sMasterUrl);
if (client == null) {
constructKubernetesClient(provider);
Expand Down Expand Up @@ -154,8 +159,7 @@ public ResourceType getResourceType() {
@Override
public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider) {
if (null != provider) {
DefaultKubernetesClient client =
clientMap.get((String) provider.getConfigMap().get("k8sMasterUrl"));
DefaultKubernetesClient client = clientMap.get(getK8sMasterUrl(provider));
if (client != null) {
client.close();
}
Expand All @@ -164,19 +168,42 @@ public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider)
return true;
}

private String getK8sMasterUrl(ExternalResourceProvider provider) {
Map<String, Object> configMap = provider.getConfigMap();
String k8sMasterUrl = (String) configMap.get("k8sMasterUrl");
if (StringUtils.isBlank(k8sMasterUrl)) {
throw new IllegalArgumentException("k8sMasterUrl is empty, please check the configuration.");
}
return k8sMasterUrl;
}

private void constructKubernetesClient(ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
String k8sClientCertData = (String) provider.getConfigMap().get("k8sClientCertData");
String k8sClientKeyData = (String) provider.getConfigMap().get("k8sClientKeyData");
String k8sCaCertData = (String) provider.getConfigMap().get("k8sCaCertData");
DefaultKubernetesClient client =
new DefaultKubernetesClient(
new ConfigBuilder()
.withMasterUrl(k8sMasterUrl)
.withClientCertData(k8sClientCertData)
.withClientKeyData(k8sClientKeyData)
.withCaCertData(k8sCaCertData)
.build());
DefaultKubernetesClient client;
Map<String, Object> configMap = provider.getConfigMap();
String k8sMasterUrl = getK8sMasterUrl(provider);
try {
String k8sConfig = (String) configMap.get("k8sConfig");
if (StringUtils.isNotBlank(k8sConfig)) {
Config kubeConfig =
Config.fromKubeconfig(
null, FileUtils.readFileToString(new File(k8sConfig), "UTF-8"), null);
client = new DefaultKubernetesClient(kubeConfig);
} else {
String k8sClientCertData = (String) configMap.get("k8sClientCertData");
String k8sClientKeyData = (String) configMap.get("k8sClientKeyData");
String k8sCaCertData = (String) configMap.get("k8sCaCertData");
client =
new DefaultKubernetesClient(
new ConfigBuilder()
.withMasterUrl(k8sMasterUrl)
.withClientCertData(k8sClientCertData)
.withClientKeyData(k8sClientKeyData)
.withCaCertData(k8sCaCertData)
.build());
}
} catch (Exception e) {
throw new KubernetesClientException("Fail to build k8s client. ", e);
}
clientMap.put(k8sMasterUrl, client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public void dealWithResourceUsedProtocol(ResourceUsedProtocol resourceUsedProtoc
} catch (Exception e) {
RMLabelContainer nodeLabels = new RMLabelContainer(labels);
String value =
Optional.of(nodeLabels.getCombinedUserCreatorEngineTypeLabel())
.map(Object::toString)
.orElse("");
Optional.of(nodeLabels.getCombinedResourceLabel()).map(Object::toString).orElse("");
logger.warn(
String.format(
"usedResource failed, request from:%s, request engine: %s, ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
NodeResource labelResource =
labelResourceService.getLabelResource(labelContainer.getCurrentLabel());
Resource requestResource = resource.getMinResource();
if (labelContainer
.getCombinedUserCreatorEngineTypeLabel()
.equals(labelContainer.getCurrentLabel())) {
if (labelContainer.getCombinedResourceLabel().equals(labelContainer.getCurrentLabel())) {
if (labelResource == null) {
labelResource = new CommonNodeResource();
labelResource.setResourceType(resource.getResourceType());
Expand Down Expand Up @@ -92,7 +90,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
labelResourceService.setLabelResource(
labelContainer.getCurrentLabel(),
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
logger.debug(
labelContainer.getCurrentLabel()
+ " to request ["
Expand Down
Loading

0 comments on commit 9b3080d

Please sign in to comment.