-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
spark support yarn cluster #4850
spark support yarn cluster #4850
Conversation
@@ -61,7 +61,7 @@ class CallbackEngineConnHook extends EngineConnHook with Logging { | |||
newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue) | |||
DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap)) | |||
|
|||
val engineConnPidCallBack = new EngineConnPidCallback() | |||
val engineConnPidCallBack = new EngineConnIdentifierCallback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val engineConnIdentifierCallback= new EngineConnIdentifierCallback()
@@ -70,9 +71,17 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) | |||
|
|||
private var thread: Thread = _ | |||
|
|||
private var applicationId: String = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should applicationId = sc.applicationId
@@ -146,11 +148,19 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w | |||
throw t | |||
} | |||
LoggerUtils.removeJobIdMDC() | |||
val deployMode: String = | |||
request.creationDesc.properties.getOrDefault("spark.submit.deployMode", "client") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should judge by yarn cluster mode label
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use this method :
val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
val isYarnClusterMode: Boolean =
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false
can add this method to LabelUtil class
pythonLibUris = sparkConfValue1 +: pythonLibUris | ||
|
||
// Set deploy-mode with the optional values `cluster`and `client`, the default value `client` | ||
val deployMode: String = SPARK_DEPLOY_MODE.getValue(options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should to use label judge
@@ -61,7 +66,7 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa | |||
val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY) | |||
val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES) | |||
|
|||
val files = getValueAndRemove(properties, "files", "").split(",").filter(isNotBlankPath) | |||
var files = getValueAndRemove(properties, "files", "").split(",").filter(isNotBlankPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update to array buffer
@@ -61,7 +66,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa | |||
val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY) | |||
val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES) | |||
|
|||
val files = getValueAndRemove(properties, "files", "").split(",").filter(isNotBlankPath) | |||
var files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use val
@@ -56,6 +63,13 @@ public void dealPid(ResponseEngineConnPid protocol) { | |||
} | |||
|
|||
engineNode.setIdentifier(protocol.pid()); | |||
|
|||
if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { | |||
ServiceInstance serviceInstance = protocol.serviceInstance(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should first org.apache.linkis.manager.am.manager.EngineNodeManager#updateEngineNode and then update label
labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance); | ||
|
||
List<String> oldKeyList = | ||
nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn’t be so complicated here, we only need to first get the new instance tag, then get the old instance tag, then delete the duplicate tag in the old tag, and then associate the new instance with the old tag just go
这里应该不用搞这么复杂,我们只需要第一步获得新的instance的标签,然后再获取老的instance标签,然后将老的标签中重复的标签删除,接着再将新的instance和老的标签关联上就好了
@@ -115,8 +124,32 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa | |||
memory | |||
} | |||
|
|||
// val deployMode: String = getValueAndRemove(properties, SPARK_DEPLOY_MODE) | |||
val deployMode: String = SPARK_DEPLOY_MODE.getValue(properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use this method :
val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
val isYarnClusterMode: Boolean =
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for reviewing the code
@@ -146,11 +148,19 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w | |||
throw t | |||
} | |||
LoggerUtils.removeJobIdMDC() | |||
val deployMode: String = | |||
request.creationDesc.properties.getOrDefault("spark.submit.deployMode", "client") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use this method :
val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels())
val isYarnClusterMode: Boolean =
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false
can add this method to LabelUtil class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
spark support yarn cluster