diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md index f40c76b43d..ed070e2ac4 100644 --- a/docs/configuration/spark.md +++ b/docs/configuration/spark.md @@ -29,7 +29,10 @@ |spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable| Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster') -spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*'' +spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*' + +Precautions for using yarnCluster: +Eureka url if 127.0.0.1 should be changed to the real host, such as "127.0.0.1:20303/eureka/" should be changed to "wds001:20303/eureka/" The spark-excel package may cause class conflicts,need to download separately,put it in spark lib wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java index 9e1733ccf1..a097b1b25b 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.HashMap; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +117,7 @@ public JobResult run() { // async job, return if (isAsync) { return new InteractiveJobResult( - submitResult.getJobStatus().isJobSubmitted(), + jobInfoResult.getJobStatus().isJobSubmitted(), "Async Submission Success", new HashMap<>()); } @@ -171,7 +172,14 @@ private JobResult getResult( "Job status is not success but \'" + jobInfoResult.getJobStatus() + "\'. Will not try to retrieve any Result"); - return new InteractiveJobResult(false, "Execute Error!!!", new HashMap<>()); + Map extraMap = new HashMap<>(); + if (jobInfoResult.getErrCode() != null) { + extraMap.put("errorCode", String.valueOf(jobInfoResult.getErrCode())); + } + if (StringUtils.isNotBlank(jobInfoResult.getErrDesc())) { + extraMap.put("errorDesc", jobInfoResult.getErrDesc()); + } + return new InteractiveJobResult(false, "Execute Error!!!", extraMap); } InteractiveJobResult result = new InteractiveJobResult(true, "Execute Success!!!", new HashMap<>()); @@ -246,6 +254,9 @@ public void onDestroy() { logger.warn("Failed to kill job username or jobId is blank"); return; } + if (isAsync) { + return; + } try { new JobKiller(oper).doKill(username, jobId); } catch (Exception e) { diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala index adcbb1a695..07cfa51d0a 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala @@ -61,15 +61,16 @@ class CallbackEngineConnHook extends EngineConnHook with Logging { newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue) DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap)) - val engineConnIdentifierCallback = new EngineConnIdentifierCallback() - Utils.tryAndError(engineConnIdentifierCallback.callback()) logger.info("<--------------------SpringBoot App init succeed-------------------->") } override def beforeExecutionExecute( engineCreationContext: EngineCreationContext, engineConn: EngineConn - ): Unit = {} + ): Unit = { + val engineConnIdentifierCallback = new EngineConnIdentifierCallback() + Utils.tryAndError(engineConnIdentifierCallback.callback()) + } override def afterExecutionExecute( engineCreationContext: EngineCreationContext, diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index b800698766..e111615cee 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope case EngineType.HIVE => RunType.HIVE case EngineType.TRINO => RunType.TRINO_SQL case EngineType.PRESTO => RunType.PRESTO_SQL + case EngineType.NEBULA => RunType.NEBULA_SQL case EngineType.ELASTICSEARCH => RunType.ES_SQL case EngineType.JDBC => RunType.JDBC case EngineType.PYTHON => RunType.SHELL diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java index c0da8cde24..4d5cb480d3 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java @@ -359,9 +359,11 @@ public Message executeECMOperation(HttpServletRequest req, @RequestBody JsonNode "Fail to process the operation parameters, cased by " + ExceptionUtils.getRootCauseMessage(e)); } - String engineConnInstance = (String) parameters.get("engineConnInstance"); + return executeECMOperation( - ecmNode, engineConnInstance, new ECMOperateRequest(userName, parameters)); + ecmNode, + parameters.getOrDefault("engineConnInstance", "").toString(), + new ECMOperateRequest(userName, parameters)); } @ApiOperation(value = "openEngineLog", notes = "open Engine log", response = Message.class) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java index 200794671a..ad259ec30c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java @@ -53,10 +53,10 @@ private Comparator sortByResource() { RMNode nodeBRm = (RMNode) nodeB; if (nodeARm.getNodeResource() == null || nodeARm.getNodeResource().getLeftResource() == null) { - return -1; + return 1; } else if (nodeBRm.getNodeResource() == null || nodeBRm.getNodeResource().getLeftResource() == null) { - return 1; + return -1; } else { if (nodeARm .getNodeResource() @@ -67,9 +67,13 @@ private Comparator sortByResource() { .getNodeResource() .getLeftResource() .moreThan(nodeBRm.getNodeResource().getLeftResource())) { - return 1; - } else { return -1; + } else { + // 从大到小排序 (Sort from large to small) + return -(nodeARm + .getNodeResource() + .getLeftResource() + .compare(nodeBRm.getNodeResource().getLeftResource())); } } } catch (Throwable t) { @@ -93,10 +97,10 @@ private Comparator sortByResourceRate() { RMNode nodeBRm = (RMNode) nodeB; if (nodeARm.getNodeResource() == null || nodeARm.getNodeResource().getLeftResource() == null) { - return -1; + return 1; } else if (nodeBRm.getNodeResource() == null || nodeBRm.getNodeResource().getLeftResource() == null) { - return 1; + return -1; } else { float aRate = ResourceUtils.getLoadInstanceResourceRate( @@ -106,7 +110,7 @@ private Comparator sortByResourceRate() { ResourceUtils.getLoadInstanceResourceRate( nodeBRm.getNodeResource().getLeftResource(), nodeBRm.getNodeResource().getMaxResource()); - return Float.compare(aRate, bRate); + return -Float.compare(aRate, bRate); } } catch (Throwable t) { logger.warn("Failed to Compare resource " + t.getMessage()); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java index 43bf789d09..dae02bec5b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java @@ -55,13 +55,13 @@ private Comparator sortByScore() { ScoreServiceInstance instanceB = (ScoreServiceInstance) nodeB; try { if (instanceA.getScore() > instanceB.getScore()) { - return 1; + return -1; } } catch (Exception e) { logger.warn("Failed to Compare resource ", e); return -1; } - return -1; + return 1; } else { return -1; } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java index 4acfb70f91..5fbbb7c32a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java @@ -54,7 +54,8 @@ public void dealPid(ResponseEngineConnPid protocol) { protocol.pid(), protocol.ticketId()); - EngineNode engineNode = defaultEngineNodeManager.getEngineNode(protocol.serviceInstance()); + EngineNode engineNode = + defaultEngineNodeManager.getEngineNodeInfoByTicketId(protocol.ticketId()); if (engineNode == null) { logger.error( "DefaultEngineConnPidCallbackService dealPid failed, engineNode is null, serviceInstance:{}", @@ -63,13 +64,12 @@ public void dealPid(ResponseEngineConnPid protocol) { } engineNode.setIdentifier(protocol.pid()); - + ServiceInstance oldServiceInstance = engineNode.getServiceInstance(); if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { ServiceInstance serviceInstance = protocol.serviceInstance(); engineNode.setServiceInstance(serviceInstance); - getEngineNodeManager().updateEngineNode(serviceInstance, engineNode); - nodeLabelService.labelsFromInstanceToNewInstance( - engineNode.getServiceInstance(), serviceInstance); + getEngineNodeManager().updateEngineNode(oldServiceInstance, engineNode); + nodeLabelService.labelsFromInstanceToNewInstance(oldServiceInstance, serviceInstance); } defaultEngineNodeManager.updateEngine(engineNode); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java index f436254911..9aa5ff797f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java @@ -23,4 +23,7 @@ public class LabelManagerConf { public static final String LONG_LIVED_LABEL = CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue(); + + public static final boolean COMBINED_WITHOUT_YARN_DEFAULT = + CommonVars.apply("linkis.combined.without.yarn.default", true).getValue(); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java index 5bda339194..9d3140267b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java @@ -18,10 +18,13 @@ package org.apache.linkis.manager.rm.domain; import org.apache.linkis.governance.common.conf.GovernanceCommonConf; +import org.apache.linkis.manager.common.conf.RMConfiguration; import org.apache.linkis.manager.label.builder.CombinedLabelBuilder; +import org.apache.linkis.manager.label.conf.LabelManagerConf; import org.apache.linkis.manager.label.entity.CombinedLabel; import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.ResourceLabel; +import org.apache.linkis.manager.label.entity.cluster.ClusterLabel; import org.apache.linkis.manager.label.entity.em.EMInstanceLabel; import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; @@ -49,7 +52,8 @@ public class RMLabelContainer { private EngineTypeLabel engineTypeLabel; private UserCreatorLabel userCreatorLabel; private EngineInstanceLabel engineInstanceLabel; - private CombinedLabel combinedUserCreatorEngineTypeLabel; + private ClusterLabel clusterLabel; + private CombinedLabel combinedResourceLabel; private Label currentLabel; public RMLabelContainer(List> labels) { @@ -57,14 +61,16 @@ public RMLabelContainer(List> labels) { this.lockedLabels = Lists.newArrayList(); try { if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) { - this.combinedUserCreatorEngineTypeLabel = - (CombinedLabel) - combinedLabelBuilder.build( - "", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel())); - this.labels.add(combinedUserCreatorEngineTypeLabel); + List