Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support submit pyspark once job on k8s and add clusterlabel to combinedlabel #4906

Merged
merged 7 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ public class LabelManagerConf {

public static final String LONG_LIVED_LABEL =
CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue();

public static final boolean COMBINED_WITHOUT_YARN_DEFAULT =
CommonVars.apply("wds.linkis.combined.without.yarn.default", true).getValue();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove prefix of wds

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.linkis.manager.rm.domain;

import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.common.conf.RMConfiguration;
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
import org.apache.linkis.manager.label.conf.LabelManagerConf;
import org.apache.linkis.manager.label.entity.CombinedLabel;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.ResourceLabel;
import org.apache.linkis.manager.label.entity.cluster.ClusterLabel;
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
Expand Down Expand Up @@ -49,22 +52,25 @@ public class RMLabelContainer {
private EngineTypeLabel engineTypeLabel;
private UserCreatorLabel userCreatorLabel;
private EngineInstanceLabel engineInstanceLabel;
private CombinedLabel combinedUserCreatorEngineTypeLabel;
private ClusterLabel clusterLabel;
private CombinedLabel combinedResourceLabel;
private Label currentLabel;

public RMLabelContainer(List<Label<?>> labels) {
this.labels = labels;
this.lockedLabels = Lists.newArrayList();
try {
if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) {
this.combinedUserCreatorEngineTypeLabel =
(CombinedLabel)
combinedLabelBuilder.build(
"", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel()));
this.labels.add(combinedUserCreatorEngineTypeLabel);
List<Label> combinedLabel = Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel());
ClusterLabel clusterLabel = getClusterLabel();
if (shouldCombinedClusterLabel(clusterLabel)) {
combinedLabel.add(clusterLabel);
}
this.combinedResourceLabel = (CombinedLabel) combinedLabelBuilder.build("", combinedLabel);
this.labels.add(combinedResourceLabel);
}
} catch (Exception e) {
logger.warn("failed to get combinedUserCreatorEngineTypeLabel", e);
logger.warn("failed to get combinedResourceLabel", e);
}
this.labels = LabelUtils.distinctLabel(this.labels, labels);
}
Expand Down Expand Up @@ -156,8 +162,31 @@ public EngineInstanceLabel getEngineInstanceLabel() {
return null;
}

public CombinedLabel getCombinedUserCreatorEngineTypeLabel() {
return combinedUserCreatorEngineTypeLabel;
public ClusterLabel getClusterLabel() {
if (clusterLabel == null) {
for (Label label : labels) {
if (label instanceof ClusterLabel) {
return (ClusterLabel) label;
}
}
} else {
return clusterLabel;
}
logger.warn("ClusterLabel not found");
return null;
}

private boolean shouldCombinedClusterLabel(ClusterLabel clusterLabel) {
return !(clusterLabel == null
|| (LabelManagerConf.COMBINED_WITHOUT_YARN_DEFAULT
&& clusterLabel
.getClusterName()
.equals(RMConfiguration.DEFAULT_YARN_CLUSTER_NAME.getValue())
&& clusterLabel.getClusterType().equals(RMConfiguration.DEFAULT_YARN_TYPE.getValue())));
}

public CombinedLabel getCombinedResourceLabel() {
return combinedResourceLabel;
}

public Label getCurrentLabel() {
Expand Down Expand Up @@ -195,6 +224,8 @@ public String toString() {
+ userCreatorLabel
+ ", engineInstanceLabel="
+ engineInstanceLabel
+ ", clusterLabel="
+ clusterLabel
+ ", currentLabel="
+ currentLabel
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public void dealWithResourceUsedProtocol(ResourceUsedProtocol resourceUsedProtoc
} catch (Exception e) {
RMLabelContainer nodeLabels = new RMLabelContainer(labels);
String value =
Optional.of(nodeLabels.getCombinedUserCreatorEngineTypeLabel())
.map(Object::toString)
.orElse("");
Optional.of(nodeLabels.getCombinedResourceLabel()).map(Object::toString).orElse("");
logger.warn(
String.format(
"usedResource failed, request from:%s, request engine: %s, ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
NodeResource labelResource =
labelResourceService.getLabelResource(labelContainer.getCurrentLabel());
Resource requestResource = resource.getMinResource();
if (labelContainer
.getCombinedUserCreatorEngineTypeLabel()
.equals(labelContainer.getCurrentLabel())) {
if (labelContainer.getCombinedResourceLabel().equals(labelContainer.getCurrentLabel())) {
if (labelResource == null) {
labelResource = new CommonNodeResource();
labelResource.setResourceType(resource.getResourceType());
Expand Down Expand Up @@ -92,7 +90,7 @@ public boolean canRequest(RMLabelContainer labelContainer, NodeResource resource
labelResourceService.setLabelResource(
labelContainer.getCurrentLabel(),
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
logger.debug(
labelContainer.getCurrentLabel()
+ " to request ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour

List<PersistenceLock> persistenceLocks = new ArrayList<>();
EMInstanceLabel emInstanceLabel = labelContainer.getEMInstanceLabel();
CombinedLabel userCreatorEngineTypeLabel =
labelContainer.getCombinedUserCreatorEngineTypeLabel();
CombinedLabel userCreatorEngineTypeLabel = labelContainer.getCombinedResourceLabel();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename userCreatorEngineTypeLabel is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename userCreatorEngineTypeLabel is better

Already rename it.


try {
// check ecm resource if not enough return
Expand Down Expand Up @@ -294,9 +293,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour
labelResource.setLockedResource(
labelResource.getLockedResource().add(resource.getLockedResource()));
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
label, labelResource, labelContainer.getCombinedResourceLabel().getStringValue());
logger.info(
String.format(
"ResourceChanged:%s --> %s", label.getStringValue(), labelResource.toString()));
Expand Down Expand Up @@ -330,9 +327,7 @@ public ResultResource requestResource(List<Label<?>> labels, NodeResource resour

// add ec resource
labelResourceService.setEngineConnLabelResource(
engineInstanceLabel,
resource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
engineInstanceLabel, resource, labelContainer.getCombinedResourceLabel().getStringValue());
// record engine locked resource
labelContainer.getLabels().add(engineInstanceLabel);
resourceLogService.recordUserResourceAction(
Expand Down Expand Up @@ -441,7 +436,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {
labelResourceService.setLabelResource(
engineInstanceLabel,
lockedResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
resourceLogService.success(
ChangeType.ENGINE_INIT, lockedResource.getLockedResource(), engineInstanceLabel, null);
} catch (Exception exception) {
Expand Down Expand Up @@ -481,7 +476,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
labelResourceSet.add(
new LabelResourceMapping(label, addedResource, ResourceOperationType.USED));
resourceCheck(label, labelResource);
Expand All @@ -493,7 +488,7 @@ public void resourceUsed(List<Label<?>> labels, NodeResource usedResource) {

if (label
.getClass()
.isAssignableFrom(labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
Expand Down Expand Up @@ -704,9 +699,7 @@ public void resourceReleased(EngineNode ecNode) {
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer
.getCombinedUserCreatorEngineTypeLabel()
.getStringValue());
labelContainer.getCombinedResourceLabel().getStringValue());
resourceCheck(label, labelResource);
}
},
Expand All @@ -725,8 +718,7 @@ public void resourceReleased(EngineNode ecNode) {

if (label
.getClass()
.isAssignableFrom(
labelContainer.getCombinedUserCreatorEngineTypeLabel().getClass())) {
.isAssignableFrom(labelContainer.getCombinedResourceLabel().getClass())) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public void recordUserResourceAction(
if (RMUtils.RM_RESOURCE_ACTION_RECORD.getValue()) {
LinkisUtils.tryAndWarn(
() -> {
CombinedLabel userCreatorEngineType =
labelContainer.getCombinedUserCreatorEngineTypeLabel();
CombinedLabel userCreatorEngineType = labelContainer.getCombinedResourceLabel();
EngineInstanceLabel engineInstanceLabel = labelContainer.getEngineInstanceLabel();
EMInstanceLabel eMInstanceLabel = labelContainer.getEMInstanceLabel();
if (userCreatorEngineType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class SparkConfig {
private String principal; // ("--principal", "")
private String keytab; // ("--keytab", "")
private String queue; // ("--queue", "")
private String pyFiles; // ("--py-files", "")

public String getK8sFileUploadPath() {
return k8sFileUploadPath;
Expand Down Expand Up @@ -419,6 +420,14 @@ public void setQueue(String queue) {
this.queue = queue;
}

public String getPyFiles() {
return pyFiles;
}

public void setPyFiles(String pyFiles) {
this.pyFiles = pyFiles;
}

@Override
public String toString() {
return "SparkConfig{"
Expand Down Expand Up @@ -534,6 +543,9 @@ public String toString() {
+ ", queue='"
+ queue
+ '\''
+ ", pyFiles='"
+ pyFiles
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
addSparkArg(sparkLauncher, "--num-executors", sparkConfig.getNumExecutors().toString());
addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
addSparkArg(sparkLauncher, "--py-files", sparkConfig.getPyFiles());
sparkLauncher.setAppResource(sparkConfig.getAppResource());
sparkLauncher.setMainClass(mainClass);
Arrays.stream(args.split("\\s+"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ object SparkConfiguration extends Logging {

val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")

val SPARK_PYTHON_FILES = CommonVars[String]("spark.submit.pyFiles", "")

val SPARK_PYTHON_TEST_MODE_ENABLE =
CommonVars[Boolean]("linkis.spark.python.test.mode.enable", false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
sparkConfig.setExecutorCores(LINKIS_SPARK_EXECUTOR_CORES.getValue(options))
sparkConfig.setNumExecutors(LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(options))
sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
sparkConfig.setPyFiles(SPARK_PYTHON_FILES.getValue(options))

logger.info(s"spark_info: ${sparkConfig}")
sparkConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory {
}
}

override protected def getSupportRunTypes: Array[String] =
Array(RunType.JAR.toString, RunType.PYSPARK.toString)

override protected def getRunType: RunType = RunType.JAR
}