Skip to content

Commit

Permalink
feat: support submit pyspark once job on k8s and add clusterlabel to …
Browse files Browse the repository at this point in the history
…combinedlabel (apache#4906)

* feat: support submit pyspark once job on k8s

* feat: modify variable name

* feat: add method to build k8s client from kubeConfig

* feat: add Spark UI port configuration for spark on k8s once job

* feat: rename userCreatorEngineTypeLabel

* feat: merge podIP and port into url

* fix: replace 'empty' with 'blank'
  • Loading branch information
lenoxzhao authored Sep 21, 2023
1 parent 4868920 commit 89a653d
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ public class LabelManagerConf {

public static final String LONG_LIVED_LABEL =
CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue();

public static final boolean COMBINED_WITHOUT_YARN_DEFAULT =
CommonVars.apply("linkis.combined.without.yarn.default", true).getValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.linkis.manager.rm.domain;

import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
import org.apache.linkis.manager.label.conf.LabelManagerConf;
import org.apache.linkis.manager.label.entity.CombinedLabel;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.ResourceLabel;
import org.apache.linkis.manager.label.entity.cluster.ClusterLabel;
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
Expand Down Expand Up @@ -49,22 +52,25 @@ public class RMLabelContainer {
private EngineTypeLabel engineTypeLabel;
private UserCreatorLabel userCreatorLabel;
private EngineInstanceLabel engineInstanceLabel;
private CombinedLabel combinedUserCreatorEngineTypeLabel;
private ClusterLabel clusterLabel;
private CombinedLabel combinedResourceLabel;
private Label currentLabel;

public RMLabelContainer(List<Label<?>> labels) {
this.labels = labels;
this.lockedLabels = Lists.newArrayList();
try {
if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) {
this.combinedUserCreatorEngineTypeLabel =
(CombinedLabel)
combinedLabelBuilder.build(
"", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel()));
this.labels.add(combinedUserCreatorEngineTypeLabel);
List<Label> combinedLabel = Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel());
ClusterLabel clusterLabel = getClusterLabel();
if (shouldCombinedClusterLabel(clusterLabel)) {
combinedLabel.add(clusterLabel);
}
this.combinedResourceLabel = (CombinedLabel) combinedLabelBuilder.build("", combinedLabel);
this.labels.add(combinedResourceLabel);
}
} catch (Exception e) {
logger.warn("failed to get combinedUserCreatorEngineTypeLabel", e);
logger.warn("failed to get combinedResourceLabel", e);
}
this.labels = LabelUtils.distinctLabel(this.labels, labels);
}
Expand Down Expand Up @@ -156,8 +162,31 @@ public EngineInstanceLabel getEngineInstanceLabel() {
return null;
}

public CombinedLabel getCombinedUserCreatorEngineTypeLabel() {
return combinedUserCreatorEngineTypeLabel;
public ClusterLabel getClusterLabel() {
if (clusterLabel == null) {
for (Label label : labels) {
if (label instanceof ClusterLabel) {
return (ClusterLabel) label;
}
}
} else {
return clusterLabel;
}
logger.warn("ClusterLabel not found");
return null;
}

private boolean shouldCombinedClusterLabel(ClusterLabel clusterLabel) {
return !(clusterLabel == null
|| (LabelManagerConf.COMBINED_WITHOUT_YARN_DEFAULT
&& clusterLabel
.getClusterName()
.equals(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue())
&& clusterLabel.getClusterType().equals(RMConfiguration.DEFAULT_YARN_TYPE.getValue())));
}

public CombinedLabel getCombinedResourceLabel() {
return combinedResourceLabel;
}

public Label getCurrentLabel() {
Expand Down Expand Up @@ -195,6 +224,8 @@ public String toString() {
+ userCreatorLabel
+ ", engineInstanceLabel="
+ engineInstanceLabel
+ ", clusterLabel="
+ clusterLabel
+ ", currentLabel="
+ currentLabel
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceQuota;
import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +50,7 @@ public class KubernetesResourceRequester implements ExternalResourceRequester {
@Override
public NodeResource requestResourceInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
String k8sMasterUrl = getK8sMasterUrl(provider);
DefaultKubernetesClient client = clientMap.get(k8sMasterUrl);
if (client == null) {
constructKubernetesClient(provider);
Expand Down Expand Up @@ -154,8 +159,7 @@ public ResourceType getResourceType() {
@Override
public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider) {
if (null != provider) {
DefaultKubernetesClient client =
clientMap.get((String) provider.getConfigMap().get("k8sMasterUrl"));
DefaultKubernetesClient client = clientMap.get(getK8sMasterUrl(provider));
if (client != null) {
client.close();
}
Expand All @@ -164,19 +168,42 @@ public Boolean reloadExternalResourceAddress(ExternalResourceProvider provider)
return true;
}

private String getK8sMasterUrl(ExternalResourceProvider provider) {
Map<String, Object> configMap = provider.getConfigMap();
String k8sMasterUrl = (String) configMap.get("k8sMasterUrl");
if (StringUtils.isBlank(k8sMasterUrl)) {
throw new IllegalArgumentException("k8sMasterUrl is empty, please check the configuration.");
}
return k8sMasterUrl;
}

private void constructKubernetesClient(ExternalResourceProvider provider) {
String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
String k8sClientCertData = (String) provider.getConfigMap().get("k8sClientCertData");
String k8sClientKeyData = (String) provider.getConfigMap().get("k8sClientKeyData");
String k8sCaCertData = (String) provider.getConfigMap().get("k8sCaCertData");
DefaultKubernetesClient client =
new DefaultKubernetesClient(
new ConfigBuilder()
.withMasterUrl(k8sMasterUrl)
.withClientCertData(k8sClientCertData)
.withClientKeyData(k8sClientKeyData)
.withCaCertData(k8sCaCertData)
.build());
DefaultKubernetesClient client;
Map<String, Object> configMap = provider.getConfigMap();
String k8sMasterUrl = getK8sMasterUrl(provider);
try {
String k8sConfig = (String) configMap.get("k8sConfig");
if (StringUtils.isNotBlank(k8sConfig)) {
Config kubeConfig =
Config.fromKubeconfig(
null, FileUtils.readFileToString(new File(k8sConfig), "UTF-8"), null);
client = new DefaultKubernetesClient(kubeConfig);
} else {
String k8sClientCertData = (String) configMap.get("k8sClientCertData");
String k8sClientKeyData = (String) configMap.get("k8sClientKeyData");
String k8sCaCertData = (String) configMap.get("k8sCaCertData");
client =
new DefaultKubernetesClient(
new ConfigBuilder()
.withMasterUrl(k8sMasterUrl)
.withClientCertData(k8sClientCertData)
.withClientKeyData(k8sClientKeyData)
.withCaCertData(k8sCaCertData)
.build());
}
} catch (Exception e) {
throw new KubernetesClientException("Fail to build k8s client. ", e);
}
clientMap.put(k8sMasterUrl, client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public void dealWithResourceUsedProtocol(ResourceUsedProtocol resourceUsedProtoc
} catch (Exception e) {
RMLabelContainer nodeLabels = new RMLabelContainer(labels);
String value =
Optional.of(nodeLabels.getCombinedUserCreatorEngineTypeLabel())
.map(Object::toString)
.orElse("");
Optional.of(nodeLabels.getCombinedResourceLabel()).map(Object::toString).orElse("");
logger.warn(
String.format(
"usedResource failed, request from:%s, request engine: %s, ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
NodeResource labelResource =
labelResourceService.getLabelResource(labelContainer.getCurrentLabel());
Resource requestResource = resource.getMinResource();
if (labelContainer
.getCombinedUserCreatorEngineTypeLabel()
.equals(labelContainer.getCurrentLabel())) {
if (labelContainer.getCombinedResourceLabel().equals(labelContainer.getCurrentLabel())) {
if (labelResource == null) {
labelResource = new CommonNodeResource();
labelResource.setResourceType(resource.getResourceType());
Expand Down Expand Up @@ -92,7 +90,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
labelResourceService.setLabelResource(
labelContainer.getCurrentLabel(),
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
logger.debug(
labelContainer.getCurrentLabel()
+ " to request ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour

List<PersistenceLock> persistenceLocks = new ArrayList<>();
EMInstanceLabel emInstanceLabel = labelContainer.getEMInstanceLabel();
CombinedLabel userCreatorEngineTypeLabel =
labelContainer.getCombinedUserCreatorEngineTypeLabel();
CombinedLabel combinedLabel = labelContainer.getCombinedResourceLabel();

try {
// check ecm resource if not enough return
Expand All @@ -266,14 +265,12 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour

// lock userCreatorEngineTypeLabel
persistenceLocks.add(
tryLockOneLabel(
userCreatorEngineTypeLabel, wait, labelContainer.getUserCreatorLabel().getUser()));
tryLockOneLabel(combinedLabel, wait, labelContainer.getUserCreatorLabel().getUser()));
try {
labelContainer.setCurrentLabel(userCreatorEngineTypeLabel);
labelContainer.setCurrentLabel(combinedLabel);
if (!requestResourceService.canRequest(labelContainer, resource)) {
return new NotEnoughResource(
String.format(
"Labels:%s not enough resource", userCreatorEngineTypeLabel.getStringValue()));
String.format("Labels:%s not enough resource", combinedLabel.getStringValue()));
}
} catch (RMWarnException exception) {
return new NotEnoughResource(exception.getMessage());
Expand All @@ -294,9 +291,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour
labelResource.setLockedResource(
labelResource.getLockedResource().add(resource.getLockedResource()));
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
label, labelResource, labelContainer.getCombinedResourceLabel().getStringValue());
logger.info(
String.format(
"ResourceChanged:%s --> %s", label.getStringValue(), labelResource.toString()));
Expand Down Expand Up @@ -330,9 +325,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour

// add ec resource
labelResourceService.setEngineConnLabelResource(
engineInstanceLabel,
resource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
engineInstanceLabel, resource, labelContainer.getCombinedResourceLabel().getStringValue());
// record engine locked resource
labelContainer.getLabels().add(engineInstanceLabel);
resourceLogService.recordUserResourceAction(
Expand Down Expand Up @@ -441,7 +434,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {
labelResourceService.setLabelResource(
engineInstanceLabel,
lockedResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
resourceLogService.success(
ChangeType.ENGINE_INIT, lockedResource.getLockedResource(), engineInstanceLabel, null);
} catch (Exception exception) {
Expand Down Expand Up @@ -481,7 +474,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
labelResourceSet.add(
new LabelResourceMapping(label, addedResource, ResourceOperationType.USED));
resourceCheck(label, labelResource);
Expand All @@ -493,7 +486,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {

if (label
.getClass()
.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
Expand Down Expand Up @@ -704,9 +697,7 @@ public void resourceReleased(EngineNode ecNode) {
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer
.getCombinedUserCreatorEngineTypeLabel()
.getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
resourceCheck(label, labelResource);
}
},
Expand All @@ -725,8 +716,7 @@ public void resourceReleased(EngineNode ecNode) {

if (label
.getClass()
.isAssignableFrom(
labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,10 @@ public void recordUserResourceAction(
if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue()) {
LinkisUtils.tryAndWarn(
() -> {
CombinedLabel userCreatorEngineType =
labelContainer.getCombinedUserCreatorEngineTypeLabel();
CombinedLabel combinedLabel = labelContainer.getCombinedResourceLabel();
EngineInstanceLabel engineInstanceLabel = labelContainer.getEngineInstanceLabel();
EMInstanceLabel eMInstanceLabel = labelContainer.getEMInstanceLabel();
if (userCreatorEngineType == null) {
if (combinedLabel == null) {
return;
}
ECResourceInfoRecord ecResourceInfoRecord =
Expand All @@ -171,11 +170,7 @@ public void recordUserResourceAction(
: "";
ecResourceInfoRecord =
new ECResourceInfoRecord(
userCreatorEngineType.getStringValue(),
user,
ticketId,
resource,
logDirSuffix);
combinedLabel.getStringValue(), user, ticketId, resource, logDirSuffix);
ecResourceRecordMapper.insertECResourceInfoRecord(ecResourceInfoRecord);
}
if (engineInstanceLabel != null) {
Expand Down
Loading

0 comments on commit 89a653d

Please sign in to comment.