Skip to content

Commit

Permalink
feat: support submit pyspark once job on k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
lenoxzhao committed Sep 14, 2023
1 parent 5ca6726 commit 3bcfbb2
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ public class LabelManagerConf {

public static final String LONG_LIVED_LABEL =
CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue();

public static final boolean COMBINED_WITHOUT_YARN_DEFAULT =
CommonVars.apply("wds.linkis.combined.without.yarn.default", true).getValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.linkis.manager.rm.domain;

import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
import org.apache.linkis.manager.label.conf.LabelManagerConf;
import org.apache.linkis.manager.label.entity.CombinedLabel;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.ResourceLabel;
import org.apache.linkis.manager.label.entity.cluster.ClusterLabel;
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
Expand Down Expand Up @@ -49,22 +52,25 @@ public class RMLabelContainer {
private EngineTypeLabel engineTypeLabel;
private UserCreatorLabel userCreatorLabel;
private EngineInstanceLabel engineInstanceLabel;
private CombinedLabel combinedUserCreatorEngineTypeLabel;
private ClusterLabel clusterLabel;
private CombinedLabel combinedResourceLabel;
private Label currentLabel;

public RMLabelContainer(List<Label<?>> labels) {
this.labels = labels;
this.lockedLabels = Lists.newArrayList();
try {
if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) {
this.combinedUserCreatorEngineTypeLabel =
(CombinedLabel)
combinedLabelBuilder.build(
"", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel()));
this.labels.add(combinedUserCreatorEngineTypeLabel);
List<Label> combinedLabel = Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel());
ClusterLabel tempClusterLabel = getClusterLabel();
if (shouldCombinedClusterLabel(tempClusterLabel)) {
combinedLabel.add(tempClusterLabel);
}
this.combinedResourceLabel = (CombinedLabel) combinedLabelBuilder.build("", combinedLabel);
this.labels.add(combinedResourceLabel);
}
} catch (Exception e) {
logger.warn("failed to get combinedUserCreatorEngineTypeLabel", e);
logger.warn("failed to get combinedResourceLabel", e);
}
this.labels = LabelUtils.distinctLabel(this.labels, labels);
}
Expand Down Expand Up @@ -156,8 +162,31 @@ public EngineInstanceLabel getEngineInstanceLabel() {
return null;
}

public CombinedLabel getCombinedUserCreatorEngineTypeLabel() {
return combinedUserCreatorEngineTypeLabel;
public ClusterLabel getClusterLabel() {
if (clusterLabel == null) {
for (Label label : labels) {
if (label instanceof ClusterLabel) {
return (ClusterLabel) label;
}
}
} else {
return clusterLabel;
}
logger.warn("ClusterLabel not found");
return null;
}

private boolean shouldCombinedClusterLabel(ClusterLabel clusterLabel) {
return !(clusterLabel == null
|| (LabelManagerConf.COMBINED_WITHOUT_YARN_DEFAULT
&& clusterLabel
.getClusterName()
.equals(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue())
&& clusterLabel.getClusterType().equals(RMConfiguration.DEFAULT_YARN_TYPE.getValue())));
}

public CombinedLabel getCombinedResourceLabel() {
return combinedResourceLabel;
}

public Label getCurrentLabel() {
Expand Down Expand Up @@ -195,6 +224,8 @@ public String toString() {
+ userCreatorLabel
+ ", engineInstanceLabel="
+ engineInstanceLabel
+ ", clusterLabel="
+ clusterLabel
+ ", currentLabel="
+ currentLabel
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public void dealWithResourceUsedProtocol(ResourceUsedProtocol resourceUsedProtoc
} catch (Exception e) {
RMLabelContainer nodeLabels = new RMLabelContainer(labels);
String value =
Optional.of(nodeLabels.getCombinedUserCreatorEngineTypeLabel())
.map(Object::toString)
.orElse("");
Optional.of(nodeLabels.getCombinedResourceLabel()).map(Object::toString).orElse("");
logger.warn(
String.format(
"usedResource failed, request from:%s, request engine: %s, ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
NodeResource labelResource =
labelResourceService.getLabelResource(labelContainer.getCurrentLabel());
Resource requestResource = resource.getMinResource();
if (labelContainer
.getCombinedUserCreatorEngineTypeLabel()
.equals(labelContainer.getCurrentLabel())) {
if (labelContainer.getCombinedResourceLabel().equals(labelContainer.getCurrentLabel())) {
if (labelResource == null) {
labelResource = new CommonNodeResource();
labelResource.setResourceType(resource.getResourceType());
Expand Down Expand Up @@ -92,7 +90,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
labelResourceService.setLabelResource(
labelContainer.getCurrentLabel(),
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
logger.debug(
labelContainer.getCurrentLabel()
+ " to request ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour

List<PersistenceLock> persistenceLocks = new ArrayList<>();
EMInstanceLabel emInstanceLabel = labelContainer.getEMInstanceLabel();
CombinedLabel userCreatorEngineTypeLabel =
labelContainer.getCombinedUserCreatorEngineTypeLabel();
CombinedLabel userCreatorEngineTypeLabel = labelContainer.getCombinedResourceLabel();

try {
// check ecm resource if not enough return
Expand Down Expand Up @@ -294,9 +293,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour
labelResource.setLockedResource(
labelResource.getLockedResource().add(resource.getLockedResource()));
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
label, labelResource, labelContainer.getCombinedResourceLabel().getStringValue());
logger.info(
String.format(
"ResourceChanged:%s --> %s", label.getStringValue(), labelResource.toString()));
Expand Down Expand Up @@ -330,9 +327,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour

// add ec resource
labelResourceService.setEngineConnLabelResource(
engineInstanceLabel,
resource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
engineInstanceLabel, resource, labelContainer.getCombinedResourceLabel().getStringValue());
// record engine locked resource
labelContainer.getLabels().add(engineInstanceLabel);
resourceLogService.recordUserResourceAction(
Expand Down Expand Up @@ -441,7 +436,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {
labelResourceService.setLabelResource(
engineInstanceLabel,
lockedResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
resourceLogService.success(
ChangeType.ENGINE_INIT, lockedResource.getLockedResource(), engineInstanceLabel, null);
} catch (Exception exception) {
Expand Down Expand Up @@ -481,7 +476,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
labelResourceSet.add(
new LabelResourceMapping(label, addedResource, ResourceOperationType.USED));
resourceCheck(label, labelResource);
Expand All @@ -493,7 +488,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {

if (label
.getClass()
.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
Expand Down Expand Up @@ -704,9 +699,7 @@ public void resourceReleased(EngineNode ecNode) {
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer
.getCombinedUserCreatorEngineTypeLabel()
.getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
resourceCheck(label, labelResource);
}
},
Expand All @@ -725,8 +718,7 @@ public void resourceReleased(EngineNode ecNode) {

if (label
.getClass()
.isAssignableFrom(
labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public void recordUserResourceAction(
if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue()) {
LinkisUtils.tryAndWarn(
() -> {
CombinedLabel userCreatorEngineType =
labelContainer.getCombinedUserCreatorEngineTypeLabel();
CombinedLabel userCreatorEngineType = labelContainer.getCombinedResourceLabel();
EngineInstanceLabel engineInstanceLabel = labelContainer.getEngineInstanceLabel();
EMInstanceLabel eMInstanceLabel = labelContainer.getEMInstanceLabel();
if (userCreatorEngineType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class SparkConfig {
private String principal; // ("--principal", "")
private String keytab; // ("--keytab", "")
private String queue; // ("--queue", "")
private String pyFiles; // ("--py-files", "")

public String getK8sFileUploadPath() {
return k8sFileUploadPath;
Expand Down Expand Up @@ -419,6 +420,14 @@ public void setQueue(String queue) {
this.queue = queue;
}

public String getPyFiles() {
return pyFiles;
}

public void setPyFiles(String pyFiles) {
this.pyFiles = pyFiles;
}

@Override
public String toString() {
return "SparkConfig{"
Expand Down Expand Up @@ -534,6 +543,9 @@ public String toString() {
+ ", queue='"
+ queue
+ '\''
+ ", pyFiles='"
+ pyFiles
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
addSparkArg(sparkLauncher, "--num-executors", sparkConfig.getNumExecutors().toString());
addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
addSparkArg(sparkLauncher, "--py-files", sparkConfig.getPyFiles());
sparkLauncher.setAppResource(sparkConfig.getAppResource());
sparkLauncher.setMainClass(mainClass);
Arrays.stream(args.split("\\s+"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ object SparkConfiguration extends Logging {

val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")

val SPARK_PYTHON_FILES = CommonVars[String]("spark.submit.pyFiles", "")

val SPARK_PYTHON_TEST_MODE_ENABLE =
CommonVars[Boolean]("linkis.spark.python.test.mode.enable", false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
sparkConfig.setExecutorCores(LINKIS_SPARK_EXECUTOR_CORES.getValue(options))
sparkConfig.setNumExecutors(LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(options))
sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
sparkConfig.setPyFiles(SPARK_PYTHON_FILES.getValue(options))

logger.info(s"spark_info: ${sparkConfig}")
sparkConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory {
}
}

override protected def getSupportRunTypes: Array[String] =
Array(RunType.JAR.toString, RunType.PYSPARK.toString)

override protected def getRunType: RunType = RunType.JAR
}

0 comments on commit 3bcfbb2

Please sign in to comment.