From 5a58d73d480f167506ea3bb5c0c6a1546e55e1e2 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 24 Aug 2023 17:02:03 +0800 Subject: [PATCH 01/15] Fix spark yarn cluster mode bug (#4872) * fix spark yarn cluster mode bug * fix spark yarn cluster mode bug --- .../callback/hook/CallbackEngineConnHook.scala | 7 ++++--- .../impl/DefaultEngineConnPidCallbackService.java | 10 +++++----- .../rm/service/impl/DefaultResourceManager.java | 1 + 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala index adcbb1a695..07cfa51d0a 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala @@ -61,15 +61,16 @@ class CallbackEngineConnHook extends EngineConnHook with Logging { newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue) DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap)) - val engineConnIdentifierCallback = new EngineConnIdentifierCallback() - Utils.tryAndError(engineConnIdentifierCallback.callback()) logger.info("<--------------------SpringBoot App init succeed-------------------->") } override def beforeExecutionExecute( engineCreationContext: EngineCreationContext, engineConn: EngineConn - ): Unit = {} + ): Unit = { + val engineConnIdentifierCallback = new EngineConnIdentifierCallback() + Utils.tryAndError(engineConnIdentifierCallback.callback()) + } override def afterExecutionExecute( engineCreationContext: EngineCreationContext, diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java index 4acfb70f91..5fbbb7c32a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java @@ -54,7 +54,8 @@ public void dealPid(ResponseEngineConnPid protocol) { protocol.pid(), protocol.ticketId()); - EngineNode engineNode = defaultEngineNodeManager.getEngineNode(protocol.serviceInstance()); + EngineNode engineNode = + defaultEngineNodeManager.getEngineNodeInfoByTicketId(protocol.ticketId()); if (engineNode == null) { logger.error( "DefaultEngineConnPidCallbackService dealPid failed, engineNode is null, serviceInstance:{}", @@ -63,13 +64,12 @@ public void dealPid(ResponseEngineConnPid protocol) { } engineNode.setIdentifier(protocol.pid()); - + ServiceInstance oldServiceInstance = engineNode.getServiceInstance(); if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { ServiceInstance serviceInstance = protocol.serviceInstance(); engineNode.setServiceInstance(serviceInstance); - getEngineNodeManager().updateEngineNode(serviceInstance, engineNode); - nodeLabelService.labelsFromInstanceToNewInstance( - engineNode.getServiceInstance(), serviceInstance); + getEngineNodeManager().updateEngineNode(oldServiceInstance, engineNode); + nodeLabelService.labelsFromInstanceToNewInstance(oldServiceInstance, serviceInstance); } defaultEngineNodeManager.updateEngine(engineNode); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java index c402701450..7ecf3f48da 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java @@ -316,6 +316,7 @@ public ResultResource requestResource(List> labels, NodeResource resour engineNode.setServiceInstance( ServiceInstance.apply(labelContainer.getEngineServiceName(), tickedId)); engineNode.setNodeResource(resource); + engineNode.setTicketId(tickedId); nodeManagerPersistence.addEngineNode(engineNode); From b5453a2672db5799e3f40653e28ff27f76963a7b Mon Sep 17 00:00:00 2001 From: peacewong Date: Fri, 1 Sep 2023 19:07:34 +0800 Subject: [PATCH 02/15] Fix linkis cli async exeute throw npe (#4870) * Fix linkis cli async exeute throw npe close #4869 * Optimize Code --- .../job/interactive/InteractiveJob.java | 15 +++++++++++++-- .../linkis/manager/am/restful/EMRestfulApi.java | 6 ++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java index 9e1733ccf1..a097b1b25b 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.HashMap; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +117,7 @@ public JobResult run() { // async job, return if (isAsync) { return new InteractiveJobResult( - submitResult.getJobStatus().isJobSubmitted(), + jobInfoResult.getJobStatus().isJobSubmitted(), "Async Submission Success", new HashMap<>()); } @@ -171,7 +172,14 @@ private JobResult getResult( "Job status is not success but \'" + jobInfoResult.getJobStatus() + "\'. Will not try to retrieve any Result"); - return new InteractiveJobResult(false, "Execute Error!!!", new HashMap<>()); + Map extraMap = new HashMap<>(); + if (jobInfoResult.getErrCode() != null) { + extraMap.put("errorCode", String.valueOf(jobInfoResult.getErrCode())); + } + if (StringUtils.isNotBlank(jobInfoResult.getErrDesc())) { + extraMap.put("errorDesc", jobInfoResult.getErrDesc()); + } + return new InteractiveJobResult(false, "Execute Error!!!", extraMap); } InteractiveJobResult result = new InteractiveJobResult(true, "Execute Success!!!", new HashMap<>()); @@ -246,6 +254,9 @@ public void onDestroy() { logger.warn("Failed to kill job username or jobId is blank"); return; } + if (isAsync) { + return; + } try { new JobKiller(oper).doKill(username, jobId); } catch (Exception e) { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java index c0da8cde24..4d5cb480d3 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java @@ -359,9 +359,11 @@ public Message executeECMOperation(HttpServletRequest req, @RequestBody JsonNode "Fail to process the operation parameters, cased by " + ExceptionUtils.getRootCauseMessage(e)); } - String engineConnInstance = (String) parameters.get("engineConnInstance"); + return executeECMOperation( - ecmNode, engineConnInstance, new ECMOperateRequest(userName, parameters)); + ecmNode, + parameters.getOrDefault("engineConnInstance", "").toString(), + new ECMOperateRequest(userName, parameters)); } @ApiOperation(value = "openEngineLog", notes = "open Engine log", response = Message.class) From 4169e703c427842d5e932b9375423d74017465ad Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 1 Sep 2023 19:09:53 +0800 Subject: [PATCH 03/15] Spark k8s operator task Added status acquisition (#4889) * Spark k8s operator task Added status acquisition * spark The obtaining status of the k8s operator task is changed to k8s list-watch * spark The obtaining status of the k8s operator task is changed to k8s list-watch --- ...netesOperatorClusterDescriptorAdapter.java | 58 ++++++++++++------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java index fa6236600e..3ea27b394f 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java @@ -38,6 +38,9 @@ import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; import org.slf4j.Logger; @@ -152,32 +155,44 @@ public void deployCluster(String mainClass, String args, Map con } public boolean initJobId() { - SparkApplicationStatus sparkApplicationStatus = getKubernetesOperatorState(); - - if (Objects.nonNull(sparkApplicationStatus)) { - this.applicationId = sparkApplicationStatus.getSparkApplicationId(); - this.jobState = - kubernetesOperatorStateConvertSparkState( - sparkApplicationStatus.getApplicationState().getState()); + try { + getKubernetesOperatorState(); + } catch (Exception e) { + try { + // Prevent watch interruption due to network interruption.Restart Watcher. + Thread.sleep(5000); + getKubernetesOperatorState(); + } catch (InterruptedException interruptedException) { + logger.error("Use k8s watch obtain the status failed"); + } } - // When the job is not finished, the appId is monitored; otherwise, the status is // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待) return null != getApplicationId() || (jobState != null && jobState.isFinal()); } - private SparkApplicationStatus getKubernetesOperatorState() { - List sparkApplicationList = - getSparkApplicationClient(client).list().getItems(); - if (CollectionUtils.isNotEmpty(sparkApplicationList)) { - for (SparkApplication sparkApplication : sparkApplicationList) { - if (sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace()) - && sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) { - return sparkApplication.getStatus(); - } - } - } - return null; + private void getKubernetesOperatorState() { + getSparkApplicationClient(client) + .inNamespace(this.sparkConfig.getK8sNamespace()) + .withName(this.sparkConfig.getAppName()) + .watch( + new Watcher() { + @Override + public void eventReceived(Action action, SparkApplication sparkApplication) { + // todo get status + applicationId = sparkApplication.getStatus().getSparkApplicationId(); + jobState = + kubernetesOperatorStateConvertSparkState( + sparkApplication.getStatus().getApplicationState().getState()); + } + + @Override + public void onClose(WatcherException e) { + // Invoked when the watcher closes due to an Exception.Restart Watcher. + logger.error("Use k8s watch obtain the status failed", e); + getKubernetesOperatorState(); + } + }); } public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String kubernetesState) { @@ -216,8 +231,7 @@ public void close() { client.close(); } - public static NonNamespaceOperation< - SparkApplication, SparkApplicationList, Resource> + public static MixedOperation> getSparkApplicationClient(KubernetesClient client) { return client.customResources(SparkApplication.class, SparkApplicationList.class); } From a3e135ea1cace9b635dadaae03342984c5a28960 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 4 Sep 2023 16:24:39 +0800 Subject: [PATCH 04/15] fix spark k8s bug (#4895) --- .../engineplugin/spark/client/context/SparkConfig.java | 6 ++++++ .../KubernetesOperatorClusterDescriptorAdapter.java | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java index 24d3ddcb2c..3d0fc0ff3b 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java @@ -17,6 +17,8 @@ package org.apache.linkis.engineplugin.spark.client.context; +import org.apache.commons.lang3.StringUtils; + import java.util.HashMap; import java.util.Map; @@ -131,6 +133,10 @@ public String getK8sConfigFile() { } public void setK8sConfigFile(String k8sConfigFile) { + if (StringUtils.isNotBlank(k8sConfigFile) && k8sConfigFile.startsWith("~")) { + String user = System.getProperty("user.home"); + k8sConfigFile = k8sConfigFile.replaceFirst("~", user); + } this.k8sConfigFile = k8sConfigFile; } diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java index 3ea27b394f..eafa8abec5 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java @@ -180,10 +180,12 @@ private void getKubernetesOperatorState() { @Override public void eventReceived(Action action, SparkApplication sparkApplication) { // todo get status - applicationId = sparkApplication.getStatus().getSparkApplicationId(); - jobState = - kubernetesOperatorStateConvertSparkState( - sparkApplication.getStatus().getApplicationState().getState()); + if (Objects.nonNull(sparkApplication.getStatus())) { + applicationId = sparkApplication.getStatus().getSparkApplicationId(); + jobState = + kubernetesOperatorStateConvertSparkState( + sparkApplication.getStatus().getApplicationState().getState()); + } } @Override From 8181efc1cf0807d6bca18df973e4db53580c4949 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 6 Sep 2023 10:27:31 +0800 Subject: [PATCH 05/15] HttpBmlClient delete useless code (#4898) --- .../apache/linkis/bml/client/impl/HttpBmlClient.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala index be251d3b23..258b3feb9b 100644 --- a/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala +++ b/linkis-public-enhancements/linkis-pes-client/src/main/scala/org/apache/linkis/bml/client/impl/HttpBmlClient.scala @@ -137,15 +137,6 @@ class HttpBmlClient( if (version != null) bmlDownloadAction.getParameters.asScala += "version" -> version bmlDownloadAction.setUser(user) val downloadResult = dwsClient.execute(bmlDownloadAction) - // val retIs = new ByteArrayInputStream(IOUtils.toString(bmlDownloadAction.getInputStream).getBytes("UTF-8")) - // if (downloadResult != null) { - // bmlDownloadAction.getResponse match { - // case r: CloseableHttpResponse => - // Utils.tryAndWarn(r.close()) - // case o: Any => - // info(s"Download response : ${o.getClass.getName} cannot close.") - // } - // } BmlDownloadResponse( isSuccess = true, bmlDownloadAction.getInputStream, From 8dfcf86ec42fd9f9ab1aed96adb7c0b16c60a11d Mon Sep 17 00:00:00 2001 From: sjgllgh <129264181+sjgllgh@users.noreply.github.com> Date: Fri, 8 Sep 2023 14:37:01 +0800 Subject: [PATCH 06/15] #4900 Fix linkismanager allocation ECM error logic, adjust resource allocation logic from large to small (#4901) Co-authored-by: weipengfei --- .../manager/am/selector/rule/ResourceNodeSelectRule.java | 4 ++-- .../linkis/manager/am/selector/rule/ScoreNodeSelectRule.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java index 200794671a..8e86f6906a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java @@ -67,9 +67,9 @@ private Comparator sortByResource() { .getNodeResource() .getLeftResource() .moreThan(nodeBRm.getNodeResource().getLeftResource())) { - return 1; - } else { return -1; + } else { + return 1; } } } catch (Throwable t) { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java index 43bf789d09..dae02bec5b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java @@ -55,13 +55,13 @@ private Comparator sortByScore() { ScoreServiceInstance instanceB = (ScoreServiceInstance) nodeB; try { if (instanceA.getScore() > instanceB.getScore()) { - return 1; + return -1; } } catch (Exception e) { logger.warn("Failed to Compare resource ", e); return -1; } - return -1; + return 1; } else { return -1; } From 5ca67262ad93ab09c3338393155f3f5fa8717078 Mon Sep 17 00:00:00 2001 From: zlucelia <66543456+Zhao-LX2000@users.noreply.github.com> Date: Fri, 8 Sep 2023 14:38:54 +0800 Subject: [PATCH 07/15] feat: support spark submit jar on k8s (#4867) * feat: support spark submit jar on k8s * feat: add spark cores setting priority * feat: use UUID to generate driverPodName * feat: optimize code of executor creation --- .../spark/client/context/SparkConfig.java | 25 ++ .../ClusterDescriptorAdapterFactory.java | 9 +- ...esApplicationClusterDescriptorAdapter.java | 231 ++++++++++++++++++ .../SparkOnKubernetesSubmitOnceExecutor.scala | 163 ++++++++++++ .../factory/SparkEngineConnFactory.scala | 2 + .../SparkEngineConnResourceFactory.scala | 8 +- .../factory/SparkOnceExecutorFactory.scala | 22 +- .../spark/utils/SparkJobProgressUtil.scala | 45 +++- 8 files changed, 494 insertions(+), 11 deletions(-) create mode 100644 linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java create mode 100644 linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java index 3d0fc0ff3b..37a0e2c980 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java @@ -50,6 +50,9 @@ public class SparkConfig { private String k8sNamespace; private String k8sFileUploadPath; + + private String k8sDriverRequestCores; + private String k8sExecutorRequestCores; private String deployMode = "client"; // ("client") // todo cluster private String appResource; // ("") private String appName; // ("") @@ -176,6 +179,22 @@ public void setK8sImage(String k8sImage) { this.k8sImage = k8sImage; } + public String getK8sDriverRequestCores() { + return k8sDriverRequestCores; + } + + public void setK8sDriverRequestCores(String k8sDriverRequestCores) { + this.k8sDriverRequestCores = k8sDriverRequestCores; + } + + public String getK8sExecutorRequestCores() { + return k8sExecutorRequestCores; + } + + public void setK8sExecutorRequestCores(String k8sExecutorRequestCores) { + this.k8sExecutorRequestCores = k8sExecutorRequestCores; + } + public String getJavaHome() { return javaHome; } @@ -442,6 +461,12 @@ public String toString() { + ", k8sNamespace='" + k8sNamespace + '\'' + + ", k8sDriverRequestCores='" + + k8sDriverRequestCores + + '\'' + + ", k8sExecutorRequestCores='" + + k8sExecutorRequestCores + + '\'' + ", deployMode='" + deployMode + '\'' diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java index 91d3eafb6f..bc67a33e9f 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java @@ -29,8 +29,13 @@ public static ClusterDescriptorAdapter create(ExecutionContext executionContext) ClusterDescriptorAdapter clusterDescriptorAdapter = new YarnApplicationClusterDescriptorAdapter(executionContext); - if (StringUtils.isNotBlank(master) && master.equalsIgnoreCase("k8s-operator")) { - clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext); + if (StringUtils.isNotBlank(master)) { + if (master.equalsIgnoreCase("k8s-operator")) { + clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext); + } else if (master.equalsIgnoreCase("k8s-native")) { + clusterDescriptorAdapter = + new KubernetesApplicationClusterDescriptorAdapter(executionContext); + } } return clusterDescriptorAdapter; diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java new file mode 100644 index 0000000000..0ee0380fb8 --- /dev/null +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.spark.client.deployment; + +import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext; +import org.apache.linkis.engineplugin.spark.client.context.SparkConfig; +import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper; + +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.util.Strings; +import org.apache.spark.launcher.CustomSparkSubmitLauncher; +import org.apache.spark.launcher.SparkAppHandle; +import org.apache.spark.launcher.SparkLauncher; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.PodResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesApplicationClusterDescriptorAdapter extends ClusterDescriptorAdapter { + private static final Logger logger = + LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class); + + protected SparkConfig sparkConfig; + protected KubernetesClient client; + protected String driverPodName; + protected String namespace; + + public KubernetesApplicationClusterDescriptorAdapter(ExecutionContext executionContext) { + super(executionContext); + this.sparkConfig = executionContext.getSparkConfig(); + this.client = + KubernetesHelper.getKubernetesClient( + this.sparkConfig.getK8sConfigFile(), + this.sparkConfig.getK8sMasterUrl(), + this.sparkConfig.getK8sUsername(), + this.sparkConfig.getK8sPassword()); + } + + public void deployCluster(String mainClass, String args, Map confMap) + throws IOException { + SparkConfig sparkConfig = executionContext.getSparkConfig(); + sparkLauncher = new CustomSparkSubmitLauncher(); + sparkLauncher + .setJavaHome(sparkConfig.getJavaHome()) + .setSparkHome(sparkConfig.getSparkHome()) + .setMaster(sparkConfig.getK8sMasterUrl()) + .setDeployMode(sparkConfig.getDeployMode()) + .setAppName(sparkConfig.getAppName()) + .setVerbose(true); + this.driverPodName = generateDriverPodName(sparkConfig.getAppName()); + this.namespace = sparkConfig.getK8sNamespace(); + setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName()); + setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace); + setConf(sparkLauncher, "spark.kubernetes.container.image", sparkConfig.getK8sImage()); + setConf(sparkLauncher, "spark.kubernetes.driver.pod.name", this.driverPodName); + setConf( + sparkLauncher, + "spark.kubernetes.driver.request.cores", + sparkConfig.getK8sDriverRequestCores()); + setConf( + sparkLauncher, + "spark.kubernetes.executor.request.cores", + sparkConfig.getK8sExecutorRequestCores()); + setConf( + sparkLauncher, + "spark.kubernetes.container.image.pullPolicy", + sparkConfig.getK8sImagePullPolicy()); + setConf( + sparkLauncher, + "spark.kubernetes.authenticate.driver.serviceAccountName", + sparkConfig.getK8sServiceAccount()); + if (confMap != null) confMap.forEach((k, v) -> sparkLauncher.setConf(k, v)); + + addSparkArg(sparkLauncher, "--jars", sparkConfig.getJars()); + addSparkArg(sparkLauncher, "--packages", sparkConfig.getPackages()); + addSparkArg(sparkLauncher, "--exclude-packages", sparkConfig.getExcludePackages()); + addSparkArg(sparkLauncher, "--repositories", sparkConfig.getRepositories()); + addSparkArg(sparkLauncher, "--files", sparkConfig.getFiles()); + addSparkArg(sparkLauncher, "--archives", sparkConfig.getArchives()); + addSparkArg(sparkLauncher, "--driver-memory", sparkConfig.getDriverMemory()); + addSparkArg(sparkLauncher, "--driver-java-options", sparkConfig.getDriverJavaOptions()); + addSparkArg(sparkLauncher, "--driver-library-path", sparkConfig.getDriverLibraryPath()); + addSparkArg(sparkLauncher, "--driver-class-path", sparkConfig.getDriverClassPath()); + addSparkArg(sparkLauncher, "--executor-memory", sparkConfig.getExecutorMemory()); + addSparkArg(sparkLauncher, "--proxy-user", sparkConfig.getProxyUser()); + addSparkArg(sparkLauncher, "--driver-cores", sparkConfig.getDriverCores().toString()); + addSparkArg(sparkLauncher, "--total-executor-cores", sparkConfig.getTotalExecutorCores()); + addSparkArg(sparkLauncher, "--executor-cores", sparkConfig.getExecutorCores().toString()); + addSparkArg(sparkLauncher, "--num-executors", sparkConfig.getNumExecutors().toString()); + addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal()); + addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab()); + sparkLauncher.setAppResource(sparkConfig.getAppResource()); + sparkLauncher.setMainClass(mainClass); + Arrays.stream(args.split("\\s+")) + .filter(StringUtils::isNotBlank) + .forEach(arg -> sparkLauncher.addAppArgs(arg)); + sparkAppHandle = + sparkLauncher.startApplication( + new SparkAppHandle.Listener() { + @Override + public void stateChanged(SparkAppHandle sparkAppHandle) {} + + @Override + public void infoChanged(SparkAppHandle sparkAppHandle) {} + }); + sparkLauncher.setSparkAppHandle(sparkAppHandle); + } + + private void addSparkArg(SparkLauncher sparkLauncher, String key, String value) { + if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) { + sparkLauncher.addSparkArg(key, value); + } + } + + private void setConf(SparkLauncher sparkLauncher, String key, String value) { + if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) { + sparkLauncher.setConf(key, value); + } + } + + public boolean initJobId() { + Pod sparkDriverPod = getSparkDriverPod(); + if (null == sparkDriverPod) { + return false; + } + String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase(); + String sparkApplicationId = sparkDriverPod.getMetadata().getLabels().get("spark-app-selector"); + + if (Strings.isNotBlank(sparkApplicationId)) { + this.applicationId = sparkApplicationId; + } + if (Strings.isNotBlank(sparkDriverPodPhase)) { + this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase); + } + + // When the job is not finished, the appId is monitored; otherwise, the status is + // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待) + return null != getApplicationId() || (jobState != null && jobState.isFinal()); + } + + protected Pod getSparkDriverPod() { + return client.pods().inNamespace(namespace).withName(driverPodName).get(); + } + + public String getSparkDriverPodIP() { + Pod sparkDriverPod = getSparkDriverPod(); + if (null != sparkDriverPod) { + String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP(); + if (StringUtils.isNotBlank(sparkDriverPodIP)) { + return sparkDriverPodIP; + } else { + logger.info("spark driver pod IP is null, the application may be pending"); + } + } else { + logger.info("spark driver pod is not exist"); + } + return ""; + } + + @Override + public SparkAppHandle.State getJobState() { + Pod sparkDriverPod = getSparkDriverPod(); + if (null != sparkDriverPod) { + String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase(); + this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase); + logger.info("Job {} state is {}.", getApplicationId(), this.jobState); + return this.jobState; + } + return null; + } + + @Override + public void close() { + logger.info("Start to close job {}.", getApplicationId()); + PodResource sparkDriverPodResource = + client.pods().inNamespace(namespace).withName(driverPodName); + if (null != sparkDriverPodResource.get()) { + sparkDriverPodResource.delete(); + } + client.close(); + } + + @Override + public boolean isDisposed() { + return this.jobState.isFinal(); + } + + public SparkAppHandle.State kubernetesPodStateConvertSparkState(String kubernetesState) { + if (StringUtils.isBlank(kubernetesState)) { + return SparkAppHandle.State.UNKNOWN; + } + switch (kubernetesState.toUpperCase()) { + case "PENDING": + return SparkAppHandle.State.CONNECTED; + case "RUNNING": + return SparkAppHandle.State.RUNNING; + case "SUCCEEDED": + return SparkAppHandle.State.FINISHED; + case "FAILED": + return SparkAppHandle.State.FAILED; + default: + return SparkAppHandle.State.UNKNOWN; + } + } + + public String generateDriverPodName(String appName) { + return appName + "-" + UUID.randomUUID().toString().replace("-", "") + "-driver"; + } +} diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala new file mode 100644 index 0000000000..1c3873942d --- /dev/null +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.spark.executor + +import org.apache.linkis.common.utils.{ByteTimeUtils, Utils} +import org.apache.linkis.engineconn.once.executor.{ + OnceExecutorExecutionContext, + OperableOnceExecutor +} +import org.apache.linkis.engineplugin.spark.client.deployment.{ + KubernetesApplicationClusterDescriptorAdapter, + YarnApplicationClusterDescriptorAdapter +} +import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ + SPARK_APP_CONF, + SPARK_APPLICATION_ARGS, + SPARK_APPLICATION_MAIN_CLASS +} +import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext +import org.apache.linkis.engineplugin.spark.utils.SparkJobProgressUtil +import org.apache.linkis.manager.common.entity.resource._ +import org.apache.linkis.manager.common.utils.ResourceUtils +import org.apache.linkis.protocol.engine.JobProgressInfo + +import org.apache.commons.lang3.StringUtils + +import java.util + +import scala.concurrent.duration.Duration + +import io.fabric8.kubernetes.api.model.Quantity + +class SparkOnKubernetesSubmitOnceExecutor( + override val id: Long, + override protected val sparkEngineConnContext: SparkEngineConnContext +) extends SparkOnceExecutor[KubernetesApplicationClusterDescriptorAdapter] + with OperableOnceExecutor { + + private var oldProgress: Float = 0f + + override def doSubmit( + onceExecutorExecutionContext: OnceExecutorExecutionContext, + options: Map[String, String] + ): Unit = { + val args = SPARK_APPLICATION_ARGS.getValue(options) + val mainClass = SPARK_APPLICATION_MAIN_CLASS.getValue(options) + val extConf = SPARK_APP_CONF.getValue(options) + val confMap = new util.HashMap[String, String]() + if (StringUtils.isNotBlank(extConf)) { + for (conf <- extConf.split("\n")) { + if (StringUtils.isNotBlank(conf)) { + val pair = conf.trim.split("=") + if (pair.length == 2) { + confMap.put(pair(0), pair(1)) + } else { + logger.warn(s"ignore spark conf: $conf") + } + } + } + } + logger.info( + s"Ready to submit spark application to kubernetes, mainClass: $mainClass, args: $args." + ) + clusterDescriptorAdapter.deployCluster(mainClass, args, confMap) + } + + override protected def waitToRunning(): Unit = { + // Wait until the task return applicationId (等待返回applicationId) + Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf) + // Synchronize applicationId to EC SparkOnceExecutor to facilitate user operations, + // such as obtaining progress and killing jobs(将applicationId同步给EC执行器,方便用户操作,如获取进度,kill任务等) + setApplicationId(clusterDescriptorAdapter.getApplicationId) + super.waitToRunning() + } + + override def getApplicationURL: String = "" + + override def getCurrentNodeResource(): NodeResource = { + logger.info("Begin to get actual used resources!") + Utils.tryCatch({ + val sparkConf = sparkEngineConnContext.getExecutionContext.getSparkConfig + val sparkNamespace = sparkConf.getK8sNamespace + + val executorNum: Int = sparkConf.getNumExecutors + val executorMem: Long = + ByteTimeUtils.byteStringAsBytes(sparkConf.getExecutorMemory) * executorNum + val driverMem: Long = ByteTimeUtils.byteStringAsBytes(sparkConf.getDriverMemory) + + val executorCoresQuantity = Quantity.parse(sparkConf.getK8sExecutorRequestCores) + val executorCores: Long = + (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 1000).toLong * executorNum + val driverCoresQuantity = Quantity.parse(sparkConf.getK8sDriverRequestCores) + val driverCores: Long = + (Quantity.getAmountInBytes(driverCoresQuantity).doubleValue() * 1000).toLong + + logger.info( + "Current actual used resources is driverMem:" + driverMem + ",driverCores:" + driverCores + ",executorMem:" + executorMem + ",executorCores:" + executorCores + ",namespace:" + sparkNamespace + ) + val usedResource = new DriverAndKubernetesResource( + new LoadInstanceResource(0, 0, 0), + new KubernetesResource(executorMem + driverMem, executorCores + driverCores, sparkNamespace) + ) + val nodeResource = new CommonNodeResource + nodeResource.setUsedResource(usedResource) + nodeResource.setResourceType(ResourceUtils.getResourceTypeByResource(usedResource)) + nodeResource + })(t => { + logger.warn("Get actual used resource exception", t) + null + }) + } + + override def getProgress: Float = { + val jobIsFinal = clusterDescriptorAdapter != null && + clusterDescriptorAdapter.getJobState != null && + clusterDescriptorAdapter.getJobState.isFinal + if (oldProgress >= 1 || jobIsFinal) { + 1 + } else { + val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP + if (StringUtils.isNotBlank(sparkDriverPodIP)) { + val newProgress = SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP) + if (newProgress > oldProgress) { + oldProgress = newProgress + } + } + oldProgress + } + } + + override def getProgressInfo: Array[JobProgressInfo] = { + val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP + if (StringUtils.isNotBlank(sparkDriverPodIP)) { + SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, sparkDriverPodIP) + } else { + Array.empty + } + } + + override def getMetrics: util.Map[String, Any] = { + new util.HashMap[String, Any]() + } + + override def getDiagnosis: util.Map[String, Any] = { + new util.HashMap[String, Any]() + } + +} diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index bc18e2badf..e8f2cd22d3 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -111,6 +111,8 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options)) sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options)) sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options)) + sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options)) + sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options)) } sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options)) sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options)) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala index 922826c2ab..640476a589 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala @@ -113,7 +113,9 @@ class SparkEngineConnResourceFactory extends AbstractEngineResourceFactory with Quantity.parse(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(properties)) (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 1000).toLong } else { - LINKIS_SPARK_EXECUTOR_CORES.getValue(properties) * 1000L + val sparkDefaultExecutorCores: Int = LINKIS_SPARK_EXECUTOR_CORES.getValue(properties) + properties.put(SPARK_K8S_EXECUTOR_REQUEST_CORES.key, sparkDefaultExecutorCores.toString) + sparkDefaultExecutorCores * 1000L } val executorMemory = LINKIS_SPARK_EXECUTOR_MEMORY.getValue(properties) val executorMemoryWithUnit = if (StringUtils.isNumeric(executorMemory)) { @@ -126,7 +128,9 @@ class SparkEngineConnResourceFactory extends AbstractEngineResourceFactory with Quantity.parse(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(properties)) (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 1000).toLong } else { - LINKIS_SPARK_DRIVER_CORES.getValue(properties) * 1000L + val sparkDefaultDriverCores: Int = LINKIS_SPARK_DRIVER_CORES.getValue(properties) + properties.put(SPARK_K8S_DRIVER_REQUEST_CORES.key, sparkDefaultDriverCores.toString) + sparkDefaultDriverCores * 1000L } val driverMemory = LINKIS_SPARK_DRIVER_MEMORY.getValue(properties) val driverMemoryWithUnit = if (StringUtils.isNumeric(driverMemory)) { diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala index 25e2649441..12a87e22f9 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala @@ -22,10 +22,16 @@ import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.once.executor.OnceExecutor import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext -import org.apache.linkis.engineplugin.spark.executor.SparkSubmitOnceExecutor +import org.apache.linkis.engineplugin.spark.executor.{ + SparkOnKubernetesSubmitOnceExecutor, + SparkSubmitOnceExecutor +} +import org.apache.linkis.manager.common.conf.RMConfiguration.DEFAULT_KUBERNETES_TYPE import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.cluster.ClusterLabel import org.apache.linkis.manager.label.entity.engine.RunType import org.apache.linkis.manager.label.entity.engine.RunType.RunType +import org.apache.linkis.manager.label.utils.LabelUtil class SparkOnceExecutorFactory extends OnceExecutorFactory { @@ -34,11 +40,21 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory { engineCreationContext: EngineCreationContext, engineConn: EngineConn, labels: Array[Label[_]] - ): OnceExecutor = + ): OnceExecutor = { + val clusterLabel = LabelUtil.getLabelFromArray[ClusterLabel](labels) engineConn.getEngineConnSession match { case context: SparkEngineConnContext => - new SparkSubmitOnceExecutor(id, context) + if ( + null != clusterLabel && clusterLabel.getClusterType.equalsIgnoreCase( + DEFAULT_KUBERNETES_TYPE.getValue + ) + ) { + new SparkOnKubernetesSubmitOnceExecutor(id, context) + } else { + new SparkSubmitOnceExecutor(id, context) + } } + } override protected def getRunType: RunType = RunType.JAR } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala index 196414420a..6968ffb61f 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala @@ -27,11 +27,15 @@ import org.apache.http.client.methods.HttpGet import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils +import java.util + object SparkJobProgressUtil extends Logging { - def getProgress(applicationId: String): Float = { + def getProgress(applicationId: String, podIP: String = ""): Float = { if (StringUtils.isBlank(applicationId)) return 0f - val sparkJobsResult = getSparkJobInfo(applicationId) + val sparkJobsResult = + if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId) + else getKubernetesSparkJobInfo(applicationId, podIP) if (sparkJobsResult.isEmpty) return 0f val tuple = sparkJobsResult .filter(sparkJobResult => { @@ -48,8 +52,10 @@ object SparkJobProgressUtil extends Logging { tuple._2.toFloat / tuple._1 } - def getSparkJobProgressInfo(applicationId: String): Array[JobProgressInfo] = { - val sparkJobsResult = getSparkJobInfo(applicationId) + def getSparkJobProgressInfo(applicationId: String, podIP: String = ""): Array[JobProgressInfo] = { + val sparkJobsResult = + if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId) + else getKubernetesSparkJobInfo(applicationId, podIP) if (sparkJobsResult.isEmpty) { Array.empty } else { @@ -96,6 +102,37 @@ object SparkJobProgressUtil extends Logging { ) } + def getKubernetesSparkJobInfo( + applicationId: String, + podIP: String + ): Array[java.util.Map[String, Object]] = + if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) Array.empty + else { + val getSparkJobsStateUrl = s"http://$podIP:4040/api/v1/applications/$applicationId" + logger.info(s"get spark job state from kubernetes spark ui, url: $getSparkJobsStateUrl") + val appStateResult = + JsonUtils.jackson.readValue( + get(getSparkJobsStateUrl), + classOf[java.util.Map[String, Object]] + ) + val appAttemptList = appStateResult.get("attempts").asInstanceOf[java.util.List[Object]] + if (appAttemptList == null || appAttemptList.size() == 0) return Array.empty + val appLastAttempt = + appAttemptList.get(appAttemptList.size() - 1).asInstanceOf[util.Map[String, Object]] + val isLastAttemptCompleted = appLastAttempt.get("completed").asInstanceOf[Boolean] + if (isLastAttemptCompleted) return Array.empty + val getSparkJobsInfoUrl = s"http://$podIP:4040/api/v1/applications/$applicationId/jobs" + logger.info(s"get spark job info from kubernetes spark ui: $getSparkJobsInfoUrl") + val jobs = get(getSparkJobsInfoUrl) + if (StringUtils.isBlank(jobs)) { + return Array.empty + } + JsonUtils.jackson.readValue( + get(getSparkJobsInfoUrl), + classOf[Array[java.util.Map[String, Object]]] + ) + } + def get(url: String): String = { val httpGet = new HttpGet(url) val client = HttpClients.createDefault From 8f149e1800a57be2d18fb8e0592191bc3aa8ba69 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 18 Sep 2023 15:09:04 +0800 Subject: [PATCH 08/15] Spark once task supports engingeConnRuntimeMode label (#4896) * Spark once task supports engingeConnRuntimeMode label * isYarnClusterMode extracts to LabelUtil * Modify SparkEngineConnFactory --- .../manager/label/utils/LabelUtil.scala | 10 ++++++++++ .../factory/SparkEngineConnFactory.scala | 20 +++++++++++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala index 3965a5ea11..986f130686 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala @@ -17,6 +17,7 @@ package org.apache.linkis.manager.label.utils +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{ CodeLanguageLabel, @@ -135,4 +136,13 @@ object LabelUtil { null.asInstanceOf[A] } + def isYarnClusterMode(labels: util.List[Label[_]]): Boolean = { + val label = LabelUtil.getEngingeConnRuntimeModeLabel(labels) + val isYarnClusterMode: Boolean = { + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + } + isYarnClusterMode + } + } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index e8f2cd22d3..fbd38bcc68 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -39,7 +39,6 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ } import org.apache.linkis.manager.engineplugin.common.launch.process.Environment import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable -import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.entity.engine.EngineType import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType import org.apache.linkis.manager.label.utils.LabelUtil @@ -86,12 +85,13 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options) val sparkHome = SPARK_HOME.getValue(options) val sparkConfDir = SPARK_CONF_DIR.getValue(options) - val sparkConfig: SparkConfig = getSparkConfig(options) + val sparkConfig: SparkConfig = + getSparkConfig(options, LabelUtil.isYarnClusterMode(engineCreationContext.getLabels())) val context = new EnvironmentContext(sparkConfig, hadoopConfDir, sparkConfDir, sparkHome, null) context } - def getSparkConfig(options: util.Map[String, String]): SparkConfig = { + def getSparkConfig(options: util.Map[String, String], isYarnClusterMode: Boolean): SparkConfig = { logger.info("options: " + JsonUtils.jackson.writeValueAsString(options)) val sparkConfig: SparkConfig = new SparkConfig() sparkConfig.setJavaHome(variable(Environment.JAVA_HOME)) @@ -114,7 +114,14 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options)) sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options)) } - sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options)) + + if (master.startsWith("yarn")) { + if (isYarnClusterMode) { + sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLUSTER) + } else { + sparkConfig.setDeployMode(SparkConfiguration.SPARK_YARN_CLIENT) + } + } sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options)) sparkConfig.setAppName(SPARK_APP_NAME.getValue(options)) sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo @@ -149,10 +156,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", "yarn").getValue) logger.info(s"------ Create new SparkContext {$master} -------") - val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineCreationContext.getLabels()) - val isYarnClusterMode: Boolean = - if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true - else false + val isYarnClusterMode = LabelUtil.isYarnClusterMode(engineCreationContext.getLabels()) if (isYarnClusterMode) { sparkConf.set("spark.submit.deployMode", "cluster") From 4ecb22e373d7407226acc7ac7c63d698f11b394f Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 20 Sep 2023 20:32:46 +0800 Subject: [PATCH 09/15] [Feature] Add nebula engine to linkis (#4903) * Add nebula engine to linkis * Reuse nebula session * Code optimization and remove wds prefix --- .../ujes/jdbc/LinkisSQLConnection.scala | 1 + .../manager/am/conf/AMConfiguration.java | 7 +- .../manager/label/conf/LabelCommonConfig.java | 3 + .../label/entity/engine/EngineType.scala | 3 + .../manager/label/entity/engine/RunType.scala | 1 + .../label/utils/EngineTypeLabelCreator.java | 2 + linkis-engineconn-plugins/nebula/pom.xml | 110 +++++ .../nebula/src/main/assembly/distribution.xml | 71 ++++ .../nebula/NebulaEngineConnPlugin.java | 72 ++++ .../NebulaProcessEngineConnLaunchBuilder.java | 22 + .../nebula/conf/NebulaConfiguration.java | 50 +++ .../nebula/conf/NebulaEngineConf.java | 53 +++ .../errorcode/NebulaErrorCodeSummary.java | 47 +++ .../exception/NebulaClientException.java | 27 ++ .../nebula/exception/NebulaExecuteError.java | 27 ++ .../NebulaStateInvalidException.java | 27 ++ .../executor/NebulaEngineConnExecutor.java | 388 ++++++++++++++++++ .../resources/linkis-engineconn.properties | 23 ++ .../nebula/src/main/resources/log4j2.xml | 91 ++++ .../factory/NebulaEngineConnFactory.scala | 44 ++ pom.xml | 1 + 21 files changed, 1067 insertions(+), 3 deletions(-) create mode 100644 linkis-engineconn-plugins/nebula/pom.xml create mode 100644 linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties create mode 100644 linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml create mode 100644 linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index b800698766..e111615cee 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope case EngineType.HIVE => RunType.HIVE case EngineType.TRINO => RunType.TRINO_SQL case EngineType.PRESTO => RunType.PRESTO_SQL + case EngineType.NEBULA => RunType.NEBULA_SQL case EngineType.ELASTICSEARCH => RunType.ES_SQL case EngineType.JDBC => RunType.JDBC case EngineType.PYTHON => RunType.SHELL diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java index d916387d29..8aba142670 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java @@ -68,7 +68,8 @@ public class AMConfiguration { public static final CommonVars MULTI_USER_ENGINE_TYPES = CommonVars.apply( - "wds.linkis.multi.user.engine.types", "jdbc,es,presto,io_file,appconn,openlookeng,trino"); + "wds.linkis.multi.user.engine.types", + "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula"); public static final CommonVars ALLOW_BATCH_KILL_ENGINE_TYPES = CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python"); @@ -104,8 +105,8 @@ public class AMConfiguration { public static String getDefaultMultiEngineUser() { String jvmUser = Utils.getJvmUser(); return String.format( - "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", io_file:\"root\"}", - jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); + "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\",io_file:\"root\"}", + jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); } public static boolean isMultiUserEngine(String engineType) { diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java index d0854186a5..f4b52a156b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java @@ -69,6 +69,9 @@ public class LabelCommonConfig { public static final CommonVars DATAX_ENGINE_VERSION = CommonVars.apply("wds.linkis.datax.engine.version", "3.0.0"); + public static final CommonVars NEBULA_ENGINE_VERSION = + CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0"); + public static final CommonVars PRESTO_ENGINE_VERSION = CommonVars.apply("wds.linkis.presto.engine.version", "0.234"); diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala index d47bb8ec39..77e7204a73 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala @@ -45,6 +45,8 @@ object EngineType extends Enumeration with Logging { val PRESTO = Value("presto") + val NEBULA = Value("nebula") + val FLINK = Value("flink") val APPCONN = Value("appconn") @@ -89,6 +91,7 @@ object EngineType extends Enumeration with Logging { case _ if IO_ENGINE_HDFS.toString.equalsIgnoreCase(str) => IO_ENGINE_HDFS case _ if PIPELINE.toString.equalsIgnoreCase(str) => PIPELINE case _ if PRESTO.toString.equalsIgnoreCase(str) => PRESTO + case _ if NEBULA.toString.equalsIgnoreCase(str) => NEBULA case _ if FLINK.toString.equalsIgnoreCase(str) => FLINK case _ if APPCONN.toString.equals(str) => APPCONN case _ if SQOOP.toString.equalsIgnoreCase(str) => SQOOP diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala index 21a067ed45..abb3e010f8 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala @@ -35,6 +35,7 @@ object RunType extends Enumeration { val PIPELINE = Value("pipeline") val JDBC = Value("jdbc") val PRESTO_SQL = Value("psql") + val NEBULA_SQL = Value("ngql") val JAR = Value("jar") val APPCONN = Value("appconn") val FUNCTION_MDQ_TYPE = Value("function.mdq") diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java index 0d6ae3c5c0..e90f282aaf 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java @@ -69,6 +69,8 @@ private static void init() { EngineType.FLINK().toString(), LabelCommonConfig.FLINK_ENGINE_VERSION.getValue()); defaultVersion.put( EngineType.PRESTO().toString(), LabelCommonConfig.PRESTO_ENGINE_VERSION.getValue()); + defaultVersion.put( + EngineType.NEBULA().toString(), LabelCommonConfig.NEBULA_ENGINE_VERSION.getValue()); defaultVersion.put( EngineType.SQOOP().toString(), LabelCommonConfig.SQOOP_ENGINE_VERSION.getValue()); defaultVersion.put( diff --git a/linkis-engineconn-plugins/nebula/pom.xml b/linkis-engineconn-plugins/nebula/pom.xml new file mode 100644 index 0000000000..bfe9714569 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/pom.xml @@ -0,0 +1,110 @@ + + + + 4.0.0 + + org.apache.linkis + linkis + ${revision} + ../../pom.xml + + + linkis-engineplugin-nebula + + + + org.apache.linkis + linkis-engineconn-plugin-core + ${project.version} + + + + org.apache.linkis + linkis-computation-engineconn + ${project.version} + + + + org.apache.linkis + linkis-storage + ${project.version} + provided + + + + org.apache.linkis + linkis-rpc + ${project.version} + provided + + + + org.apache.linkis + linkis-common + ${project.version} + provided + + + + + com.vesoft + client + ${nebula.version} + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + org.apache.maven.plugins + maven-assembly-plugin + false + + false + out + false + false + + src/main/assembly/distribution.xml + + + + + make-assembly + + single + + package + + + src/main/assembly/distribution.xml + + + + + + + + + diff --git a/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml new file mode 100644 index 0000000000..eaa9c296f1 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml @@ -0,0 +1,71 @@ + + + + + linkis-engineplugin-nebula + + dir + zip + + true + nebula + + + + + + /dist/${nebula.version}/lib + true + true + false + false + true + + + + + + + + ${basedir}/src/main/resources + + linkis-engineconn.properties + log4j2.xml + + 0777 + dist/${nebula.version}/conf + unix + + + + ${basedir}/target + + *.jar + + + *doc.jar + + 0777 + plugin/${nebula.version} + + + + + + diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java new file mode 100644 index 0000000000..a22d2c8a84 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula; + +import org.apache.linkis.engineplugin.nebula.builder.NebulaProcessEngineConnLaunchBuilder; +import org.apache.linkis.engineplugin.nebula.factory.NebulaEngineConnFactory; +import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin; +import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory; +import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder; +import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory; +import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory; +import org.apache.linkis.manager.label.entity.Label; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class NebulaEngineConnPlugin implements EngineConnPlugin { + private Object resourceLocker = new Object(); + private Object engineFactoryLocker = new Object(); + private volatile EngineResourceFactory engineResourceFactory; + private volatile EngineConnFactory engineFactory; + private List> defaultLabels = new ArrayList<>(); + + @Override + public void init(Map params) {} + + @Override + public EngineResourceFactory getEngineResourceFactory() { + if (null == engineResourceFactory) { + synchronized (resourceLocker) { + engineResourceFactory = new GenericEngineResourceFactory(); + } + } + return engineResourceFactory; + } + + @Override + public EngineConnLaunchBuilder getEngineConnLaunchBuilder() { + return new NebulaProcessEngineConnLaunchBuilder(); + } + + @Override + public EngineConnFactory getEngineConnFactory() { + if (null == engineFactory) { + synchronized (engineFactoryLocker) { + engineFactory = new NebulaEngineConnFactory(); + } + } + return engineFactory; + } + + @Override + public List> getDefaultLabels() { + return defaultLabels; + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java new file mode 100644 index 0000000000..fb95910cf5 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.builder; + +import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder; + +public class NebulaProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java new file mode 100644 index 0000000000..dfbb7a8b13 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.conf; + +import org.apache.linkis.common.conf.CommonVars; + +public class NebulaConfiguration { + + public static final CommonVars ENGINE_CONCURRENT_LIMIT = + CommonVars.apply("linkis.engineconn.concurrent.limit", 100); + + public static final CommonVars ENGINE_DEFAULT_LIMIT = + CommonVars.apply("linkis.nebula.default.limit", 5000); + + public static final CommonVars NEBULA_HOST = + CommonVars.apply("linkis.nebula.host", "127.0.0.1"); + + public static final CommonVars NEBULA_PORT = + CommonVars.apply("linkis.nebula.port", 9669); + + public static final CommonVars NEBULA_MAX_CONN_SIZE = + CommonVars.apply("linkis.nebula.max.conn.size", 100); + + public static final CommonVars NEBULA_USER_NAME = + CommonVars.apply("linkis.nebula.username", "root"); + + public static final CommonVars NEBULA_PASSWORD = + CommonVars.apply("linkis.nebula.password", "nebula"); + + public static final CommonVars NEBULA_RECONNECT_ENABLED = + CommonVars.apply( + "linkis.nebula.reconnect.enabled", + false, + "whether to retry after the connection is disconnected"); +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java new file mode 100644 index 0000000000..92cc32ca01 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.conf; + +import org.apache.linkis.common.conf.Configuration; +import org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfigWithGlobalConfig; +import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig; +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; +import org.apache.linkis.protocol.CacheableProtocol; +import org.apache.linkis.rpc.RPCMapCache; + +import java.util.Map; + +import scala.Tuple2; + +public class NebulaEngineConf + extends RPCMapCache, String, String> { + + public NebulaEngineConf() { + super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue()); + } + + @Override + public CacheableProtocol createRequest(Tuple2 labelTuple) { + return new RequestQueryEngineConfigWithGlobalConfig(labelTuple._1(), labelTuple._2(), null); + } + + @Override + public Map createMap(Object obj) { + if (obj instanceof ResponseQueryConfig) { + ResponseQueryConfig response = (ResponseQueryConfig) obj; + return response.getKeyAndValue(); + } else { + return null; + } + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java new file mode 100644 index 0000000000..80aa2e197e --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.errorcode; + +import org.apache.linkis.common.errorcode.ErrorCodeUtils; +import org.apache.linkis.common.errorcode.LinkisErrorCode; + +public enum NebulaErrorCodeSummary implements LinkisErrorCode { + NEBULA_CLIENT_INITIALIZATION_FAILED(28001, "Nebula client initialization failed(Nebula客户端初始化失败)"), + NEBULA_EXECUTOR_ERROR(28002, "Nebula executor error(Nebula执行异常)"), + NEBULA_CLIENT_ERROR(28003, "Nebula client error(Nebula客户端异常)"); + + private final int errorCode; + + private final String errorDesc; + + NebulaErrorCodeSummary(int errorCode, String errorDesc) { + ErrorCodeUtils.validateErrorCode(errorCode, 26000, 29999); + this.errorCode = errorCode; + this.errorDesc = errorDesc; + } + + @Override + public int getErrorCode() { + return errorCode; + } + + @Override + public String getErrorDesc() { + return errorDesc; + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java new file mode 100644 index 0000000000..59b3620b03 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class NebulaClientException extends ErrorException { + + public NebulaClientException(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java new file mode 100644 index 0000000000..f2c164d5a2 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class NebulaExecuteError extends ErrorException { + + public NebulaExecuteError(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java new file mode 100644 index 0000000000..202d478b76 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class NebulaStateInvalidException extends ErrorException { + + public NebulaStateInvalidException(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java new file mode 100644 index 0000000000..188ea60ec4 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.executor; + +import org.apache.linkis.common.exception.ErrorException; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.common.log.LogUtils; +import org.apache.linkis.common.utils.OverloadUtils; +import org.apache.linkis.engineconn.common.conf.EngineConnConf; +import org.apache.linkis.engineconn.common.conf.EngineConnConstant; +import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask; +import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor; +import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext; +import org.apache.linkis.engineconn.core.EngineConnObject; +import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration; +import org.apache.linkis.engineplugin.nebula.conf.NebulaEngineConf; +import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary; +import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException; +import org.apache.linkis.engineplugin.nebula.exception.NebulaExecuteError; +import org.apache.linkis.governance.common.paser.SQLCodeParser; +import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; +import org.apache.linkis.manager.common.entity.resource.LoadResource; +import org.apache.linkis.manager.common.entity.resource.NodeResource; +import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils; +import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; +import org.apache.linkis.protocol.engine.JobProgressInfo; +import org.apache.linkis.rpc.Sender; +import org.apache.linkis.scheduler.executer.ErrorExecuteResponse; +import org.apache.linkis.scheduler.executer.ExecuteResponse; +import org.apache.linkis.scheduler.executer.SuccessExecuteResponse; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.domain.DataType; +import org.apache.linkis.storage.resultset.ResultSetFactory; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; + +import org.springframework.util.CollectionUtils; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import scala.Tuple2; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.vesoft.nebula.ErrorCode; +import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.net.NebulaPool; +import com.vesoft.nebula.client.graph.net.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor { + + private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class); + private int id; + private List> executorLabels = new ArrayList<>(2); + private Map sessionCache = new ConcurrentHashMap<>(); + + private Map configMap = new HashMap<>(); + + private Cache nebulaPoolCache = + CacheBuilder.newBuilder() + .expireAfterAccess( + Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()), + TimeUnit.MILLISECONDS) + .maximumSize(EngineConnConstant.MAX_TASK_NUM()) + .build(); + + public NebulaEngineConnExecutor(int outputPrintLimit, int id) { + super(outputPrintLimit); + this.id = id; + } + + @Override + public void init() { + setCodeParser(new SQLCodeParser()); + super.init(); + } + + @Override + public ExecuteResponse execute(EngineConnTask engineConnTask) { + Optional> userCreatorLabelOp = + Arrays.stream(engineConnTask.getLables()) + .filter(label -> label instanceof UserCreatorLabel) + .findFirst(); + Optional> engineTypeLabelOp = + Arrays.stream(engineConnTask.getLables()) + .filter(label -> label instanceof EngineTypeLabel) + .findFirst(); + + Map configMap = null; + if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) { + UserCreatorLabel userCreatorLabel = (UserCreatorLabel) userCreatorLabelOp.get(); + EngineTypeLabel engineTypeLabel = (EngineTypeLabel) engineTypeLabelOp.get(); + + configMap = + new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel)); + } + + nebulaPoolCache.put( + engineConnTask.getTaskId(), getNebulaPool(engineConnTask.getProperties(), configMap)); + return super.execute(engineConnTask); + } + + @Override + public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) { + String realCode; + if (StringUtils.isBlank(code)) { + realCode = "SHOW SPACES"; + } else { + realCode = code.trim(); + } + logger.info("Nebula client begins to run ngql code:\n {}", realCode); + + String taskId = engineExecutorContext.getJobId().get(); + NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId); + Session session = getSession(taskId, nebulaPool); + + initialStatusUpdates(taskId, engineExecutorContext, session); + ResultSet resultSet = null; + + try { + resultSet = session.execute(code); + } catch (Exception e) { + logger.error("Nebula executor error."); + throw new NebulaExecuteError( + NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc()); + } + + if (resultSet.isSucceeded() && !resultSet.isEmpty()) { + queryOutput(taskId, engineExecutorContext, resultSet); + } + ErrorExecuteResponse errorResponse = null; + try { + errorResponse = verifyServerError(taskId, engineExecutorContext, resultSet); + } catch (ErrorException e) { + logger.error("Nebula execute failed (#{}): {}", e.getErrCode(), e.getMessage()); + } + if (errorResponse == null) { + return new SuccessExecuteResponse(); + } else { + return errorResponse; + } + } + + @Override + public ExecuteResponse executeCompletely( + EngineExecutionContext engineExecutorContext, String code, String completedLine) { + return null; + } + + @Override + public float progress(String taskID) { + return 0.0f; + } + + @Override + public JobProgressInfo[] getProgressInfo(String taskID) { + return new JobProgressInfo[0]; + } + + @Override + public void killTask(String taskId) { + Session session = sessionCache.remove(taskId); + if (null != session) { + session.release(); + } + super.killTask(taskId); + } + + @Override + public List> getExecutorLabels() { + return executorLabels; + } + + @Override + public void setExecutorLabels(List> labels) { + if (!CollectionUtils.isEmpty(labels)) { + executorLabels.clear(); + executorLabels.addAll(labels); + } + } + + @Override + public boolean supportCallBackLogs() { + return false; + } + + @Override + public NodeResource requestExpectedResource(NodeResource expectedResource) { + return null; + } + + @Override + public NodeResource getCurrentNodeResource() { + NodeResourceUtils.appendMemoryUnitIfMissing( + EngineConnObject.getEngineCreationContext().getOptions()); + + CommonNodeResource resource = new CommonNodeResource(); + LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1); + resource.setUsedResource(usedResource); + return resource; + } + + @Override + public String getId() { + return Sender.getThisServiceInstance().getInstance() + "_" + id; + } + + @Override + public int getConcurrentLimit() { + return NebulaConfiguration.ENGINE_CONCURRENT_LIMIT.getValue(); + } + + private NebulaPool getNebulaPool(Map taskParams, Map cacheMap) { + if (!CollectionUtils.isEmpty(cacheMap)) { + configMap.putAll(cacheMap); + } + taskParams.entrySet().stream() + .filter(entry -> entry.getValue() != null) + .forEach(entry -> configMap.put(entry.getKey(), String.valueOf(entry.getValue()))); + + String host = NebulaConfiguration.NEBULA_HOST.getValue(configMap); + Integer port = NebulaConfiguration.NEBULA_PORT.getValue(configMap); + Integer maxConnSize = NebulaConfiguration.NEBULA_MAX_CONN_SIZE.getValue(configMap); + + NebulaPool nebulaPool = new NebulaPool(); + Boolean initResult = false; + try { + + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + nebulaPoolConfig.setMaxConnSize(maxConnSize); + List addresses = Arrays.asList(new HostAddress(host, port)); + initResult = nebulaPool.init(addresses, nebulaPoolConfig); + } catch (Exception e) { + logger.error("NebulaPool initialization failed."); + throw new NebulaClientException( + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + } + if (!initResult) { + logger.error("NebulaPool initialization failed."); + throw new NebulaClientException( + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + } + return nebulaPool; + } + + private Session getSession(String taskId, NebulaPool nebulaPool) { + if (sessionCache.containsKey(taskId) + && sessionCache.get(taskId) != null + && sessionCache.get(taskId).ping()) { + return sessionCache.get(taskId); + } else { + Session session; + String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap); + String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap); + Boolean reconnect = NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap); + + try { + session = nebulaPool.getSession(username, password, reconnect); + } catch (Exception e) { + logger.error("Nebula Session initialization failed."); + throw new NebulaClientException( + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + } + + sessionCache.put(taskId, session); + return session; + } + } + + private void initialStatusUpdates( + String taskId, EngineExecutionContext engineExecutorContext, Session session) { + if (session.ping()) { + engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); + } + } + + private void queryOutput( + String taskId, EngineExecutionContext engineExecutorContext, ResultSet resultSet) { + int columnCount = 0; + ResultSetWriter resultSetWriter = + engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE); + + try { + List colNames = resultSet.keys(); + + if (CollectionUtils.isEmpty(colNames)) { + throw new RuntimeException("Nebula columns is null."); + } + + List columns = + colNames.stream() + .map(column -> new Column(column, DataType.toDataType("string"), "")) + .collect(Collectors.toList()); + columnCount = columns.size(); + resultSetWriter.addMetaData(new TableMetaData(columns.toArray(new Column[0]))); + if (!resultSet.isEmpty()) { + for (int i = 0; i < resultSet.rowsSize(); i++) { + ResultSet.Record record = resultSet.rowValues(i); + if (record != null) { + String[] rowArray = + record.values().stream() + .map( + x -> { + try { + return x.asString(); + } catch (Exception e) { + return ""; + } + }) + .toArray(String[]::new); + resultSetWriter.addRecord(new TableRecord(rowArray)); + } + } + engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); + } + } catch (Exception e) { + IOUtils.closeQuietly(resultSetWriter); + } + String message = + String.format("Fetched %d col(s) : %d row(s) in Nebula", columnCount, resultSet.rowsSize()); + logger.info(message); + engineExecutorContext.appendStdout(LogUtils.generateInfo(message)); + engineExecutorContext.sendResultSet(resultSetWriter); + } + + private ErrorExecuteResponse verifyServerError( + String taskId, EngineExecutionContext engineExecutorContext, ResultSet resultSet) + throws ErrorException { + engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); + + if (!resultSet.isSucceeded() || resultSet.getErrorCode() != ErrorCode.SUCCEEDED.getValue()) { + logger.error( + "Nebula execute failed (#{}): {}", resultSet.getErrorCode(), resultSet.getErrorMessage()); + engineExecutorContext.appendStdout(LogUtils.generateERROR(resultSet.getErrorMessage())); + return new ErrorExecuteResponse(resultSet.getErrorMessage(), null); + } + return null; + } + + @Override + public void killAll() { + Iterator iterator = sessionCache.values().iterator(); + while (iterator.hasNext()) { + Session session = iterator.next(); + if (session != null) { + session.release(); + } + } + sessionCache.clear(); + } + + @Override + public void close() { + killAll(); + super.close(); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties new file mode 100644 index 0000000000..059eccb793 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +wds.linkis.server.version=v1 +#wds.linkis.engineconn.debug.enable=true +#wds.linkis.keytab.enable=true +wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.nebula.NebulaEngineConnPlugin + +wds.linkis.engineconn.support.parallelism=true + +wds.linkis.engineconn.max.free.time=0 \ No newline at end of file diff --git a/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..2cd3e264c3 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml @@ -0,0 +1,91 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala new file mode 100644 index 0000000000..2f7c3c8fb8 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.factory + +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.common.engineconn.EngineConn +import org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory +import org.apache.linkis.engineconn.executor.entity.LabelExecutor +import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration +import org.apache.linkis.engineplugin.nebula.executor.NebulaEngineConnExecutor +import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType} +import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType +import org.apache.linkis.manager.label.entity.engine.RunType.RunType + +class NebulaEngineConnFactory extends ComputationSingleExecutorEngineConnFactory { + + override def newExecutor( + id: Int, + engineCreationContext: EngineCreationContext, + engineConn: EngineConn + ): LabelExecutor = { + new NebulaEngineConnExecutor(NebulaConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id) + } + + override protected def getEngineConnType: EngineType = EngineType.NEBULA + + override protected def getRunType: RunType = RunType.NEBULA_SQL + +} diff --git a/pom.xml b/pom.xml index 001e8189d6..f9930b12b8 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ 1.5.0 1 0.234 + 3.0.0 python2 2.1.2 1 From d6a86a1dbcf25669379a4c562740c8310485f50b Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 20 Sep 2023 20:33:18 +0800 Subject: [PATCH 10/15] Modify spark.md (#4875) * linkis-cli add the engingeConnRuntimeModeOP --- docs/configuration/spark.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md index f40c76b43d..ed070e2ac4 100644 --- a/docs/configuration/spark.md +++ b/docs/configuration/spark.md @@ -29,7 +29,10 @@ |spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable| Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster') -spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*'' +spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*' + +Precautions for using yarnCluster: +Eureka url if 127.0.0.1 should be changed to the real host, such as "127.0.0.1:20303/eureka/" should be changed to "wds001:20303/eureka/" The spark-excel package may cause class conflicts,need to download separately,put it in spark lib wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar From 486892036e48c8e04a45e8e4ae27f486c2884c0e Mon Sep 17 00:00:00 2001 From: sjgllgh <129264181+sjgllgh@users.noreply.github.com> Date: Wed, 20 Sep 2023 20:34:09 +0800 Subject: [PATCH 11/15] #4907 Incorrect adjustment of log printing resource parameters (#4908) --- .../manager/common/entity/resource/LoadInstanceResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java index 57ccc4f313..7fb332f351 100644 --- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java +++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java @@ -149,7 +149,7 @@ public String toJson() { public String toString() { return String.format( "Number of instances(实例数):%d,(RAM)内存:%s ,cpu: %s", - this.getInstances(), this.getCores(), this.getMemory()); + this.getInstances(), this.getMemory(), this.getCores()); } public long getMemory() { From 89a653d75359b912bffade16426b0ab4b850d01a Mon Sep 17 00:00:00 2001 From: zlucelia <66543456+Zhao-LX2000@users.noreply.github.com> Date: Thu, 21 Sep 2023 21:45:40 +0800 Subject: [PATCH 12/15] feat: support submit pyspark once job on k8s and add clusterlabel to combinedlabel (#4906) * feat: support submit pyspark once job on k8s * feat: modify variable name * feat: add method to build k8s client from kubeConfig * feat: add Spark UI port configuration for spark on k8s once job * feat: rename userCreatorEngineTypeLabel * feat: merge podIP and port into url * fix: replace 'empty' with 'blank' --- .../manager/label/conf/LabelManagerConf.java | 3 + .../manager/rm/domain/RMLabelContainer.java | 49 +++++++++++++--- .../KubernetesResourceRequester.java | 57 ++++++++++++++----- .../manager/rm/message/RMMessageService.java | 4 +- .../rm/service/RequestResourceService.java | 6 +- .../service/impl/DefaultResourceManager.java | 32 ++++------- .../rm/service/impl/ResourceLogService.java | 11 +--- .../spark/client/context/SparkConfig.java | 24 ++++++++ ...esApplicationClusterDescriptorAdapter.java | 6 +- .../spark/config/SparkConfiguration.scala | 3 + .../SparkOnKubernetesSubmitOnceExecutor.scala | 13 +++-- .../factory/SparkEngineConnFactory.scala | 2 + .../factory/SparkOnceExecutorFactory.scala | 3 + .../spark/utils/SparkJobProgressUtil.scala | 26 +++++---- 14 files changed, 161 insertions(+), 78 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java index f436254911..9aa5ff797f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java @@ -23,4 +23,7 @@ public class LabelManagerConf { public static final String LONG_LIVED_LABEL = CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue(); + + public static final boolean COMBINED_WITHOUT_YARN_DEFAULT = + CommonVars.apply("linkis.combined.without.yarn.default", true).getValue(); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java index 5bda339194..9d3140267b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java @@ -18,10 +18,13 @@ package org.apache.linkis.manager.rm.domain; import org.apache.linkis.governance.common.conf.GovernanceCommonConf; +import org.apache.linkis.manager.common.conf.RMConfiguration; import org.apache.linkis.manager.label.builder.CombinedLabelBuilder; +import org.apache.linkis.manager.label.conf.LabelManagerConf; import org.apache.linkis.manager.label.entity.CombinedLabel; import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.ResourceLabel; +import org.apache.linkis.manager.label.entity.cluster.ClusterLabel; import org.apache.linkis.manager.label.entity.em.EMInstanceLabel; import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; @@ -49,7 +52,8 @@ public class RMLabelContainer { private EngineTypeLabel engineTypeLabel; private UserCreatorLabel userCreatorLabel; private EngineInstanceLabel engineInstanceLabel; - private CombinedLabel combinedUserCreatorEngineTypeLabel; + private ClusterLabel clusterLabel; + private CombinedLabel combinedResourceLabel; private Label currentLabel; public RMLabelContainer(List> labels) { @@ -57,14 +61,16 @@ public RMLabelContainer(List> labels) { this.lockedLabels = Lists.newArrayList(); try { if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) { - this.combinedUserCreatorEngineTypeLabel = - (CombinedLabel) - combinedLabelBuilder.build( - "", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel())); - this.labels.add(combinedUserCreatorEngineTypeLabel); + List