From 5ca67262ad93ab09c3338393155f3f5fa8717078 Mon Sep 17 00:00:00 2001 From: zlucelia <66543456+Zhao-LX2000@users.noreply.github.com> Date: Fri, 8 Sep 2023 14:38:54 +0800 Subject: [PATCH] feat: support spark submit jar on k8s (#4867) * feat: support spark submit jar on k8s * feat: add spark cores setting priority * feat: use UUID to generate driverPodName * feat: optimize code of executor creation --- .../spark/client/context/SparkConfig.java | 25 ++ .../ClusterDescriptorAdapterFactory.java | 9 +- ...esApplicationClusterDescriptorAdapter.java | 231 ++++++++++++++++++ .../SparkOnKubernetesSubmitOnceExecutor.scala | 163 ++++++++++++ .../factory/SparkEngineConnFactory.scala | 2 + .../SparkEngineConnResourceFactory.scala | 8 +- .../factory/SparkOnceExecutorFactory.scala | 22 +- .../spark/utils/SparkJobProgressUtil.scala | 45 +++- 8 files changed, 494 insertions(+), 11 deletions(-) create mode 100644 linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java create mode 100644 linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java index 3d0fc0ff3b..37a0e2c980 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java @@ -50,6 +50,9 @@ public class SparkConfig { private String k8sNamespace; private String k8sFileUploadPath; + + private String k8sDriverRequestCores; + private String k8sExecutorRequestCores; private String deployMode = "client"; // ("client") // todo cluster private String appResource; // ("") private String appName; // ("") @@ -176,6 +179,22 @@ public void setK8sImage(String k8sImage) { this.k8sImage = k8sImage; } + public String getK8sDriverRequestCores() { + return k8sDriverRequestCores; + } + + public void setK8sDriverRequestCores(String k8sDriverRequestCores) { + this.k8sDriverRequestCores = k8sDriverRequestCores; + } + + public String getK8sExecutorRequestCores() { + return k8sExecutorRequestCores; + } + + public void setK8sExecutorRequestCores(String k8sExecutorRequestCores) { + this.k8sExecutorRequestCores = k8sExecutorRequestCores; + } + public String getJavaHome() { return javaHome; } @@ -442,6 +461,12 @@ public String toString() { + ", k8sNamespace='" + k8sNamespace + '\'' + + ", k8sDriverRequestCores='" + + k8sDriverRequestCores + + '\'' + + ", k8sExecutorRequestCores='" + + k8sExecutorRequestCores + + '\'' + ", deployMode='" + deployMode + '\'' diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java index 91d3eafb6f..bc67a33e9f 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java @@ -29,8 +29,13 @@ public static ClusterDescriptorAdapter create(ExecutionContext executionContext) ClusterDescriptorAdapter clusterDescriptorAdapter = new YarnApplicationClusterDescriptorAdapter(executionContext); - if (StringUtils.isNotBlank(master) && master.equalsIgnoreCase("k8s-operator")) { - clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext); + if (StringUtils.isNotBlank(master)) { + if (master.equalsIgnoreCase("k8s-operator")) { + clusterDescriptorAdapter = new KubernetesOperatorClusterDescriptorAdapter(executionContext); + } else if (master.equalsIgnoreCase("k8s-native")) { + clusterDescriptorAdapter = + new KubernetesApplicationClusterDescriptorAdapter(executionContext); + } } return clusterDescriptorAdapter; diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java new file mode 100644 index 0000000000..0ee0380fb8 --- /dev/null +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.spark.client.deployment; + +import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext; +import org.apache.linkis.engineplugin.spark.client.context.SparkConfig; +import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper; + +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.util.Strings; +import org.apache.spark.launcher.CustomSparkSubmitLauncher; +import org.apache.spark.launcher.SparkAppHandle; +import org.apache.spark.launcher.SparkLauncher; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.PodResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesApplicationClusterDescriptorAdapter extends ClusterDescriptorAdapter { + private static final Logger logger = + LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class); + + protected SparkConfig sparkConfig; + protected KubernetesClient client; + protected String driverPodName; + protected String namespace; + + public KubernetesApplicationClusterDescriptorAdapter(ExecutionContext executionContext) { + super(executionContext); + this.sparkConfig = executionContext.getSparkConfig(); + this.client = + KubernetesHelper.getKubernetesClient( + this.sparkConfig.getK8sConfigFile(), + this.sparkConfig.getK8sMasterUrl(), + this.sparkConfig.getK8sUsername(), + this.sparkConfig.getK8sPassword()); + } + + public void deployCluster(String mainClass, String args, Map confMap) + throws IOException { + SparkConfig sparkConfig = executionContext.getSparkConfig(); + sparkLauncher = new CustomSparkSubmitLauncher(); + sparkLauncher + .setJavaHome(sparkConfig.getJavaHome()) + .setSparkHome(sparkConfig.getSparkHome()) + .setMaster(sparkConfig.getK8sMasterUrl()) + .setDeployMode(sparkConfig.getDeployMode()) + .setAppName(sparkConfig.getAppName()) + .setVerbose(true); + this.driverPodName = generateDriverPodName(sparkConfig.getAppName()); + this.namespace = sparkConfig.getK8sNamespace(); + setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName()); + setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace); + setConf(sparkLauncher, "spark.kubernetes.container.image", sparkConfig.getK8sImage()); + setConf(sparkLauncher, "spark.kubernetes.driver.pod.name", this.driverPodName); + setConf( + sparkLauncher, + "spark.kubernetes.driver.request.cores", + sparkConfig.getK8sDriverRequestCores()); + setConf( + sparkLauncher, + "spark.kubernetes.executor.request.cores", + sparkConfig.getK8sExecutorRequestCores()); + setConf( + sparkLauncher, + "spark.kubernetes.container.image.pullPolicy", + sparkConfig.getK8sImagePullPolicy()); + setConf( + sparkLauncher, + "spark.kubernetes.authenticate.driver.serviceAccountName", + sparkConfig.getK8sServiceAccount()); + if (confMap != null) confMap.forEach((k, v) -> sparkLauncher.setConf(k, v)); + + addSparkArg(sparkLauncher, "--jars", sparkConfig.getJars()); + addSparkArg(sparkLauncher, "--packages", sparkConfig.getPackages()); + addSparkArg(sparkLauncher, "--exclude-packages", sparkConfig.getExcludePackages()); + addSparkArg(sparkLauncher, "--repositories", sparkConfig.getRepositories()); + addSparkArg(sparkLauncher, "--files", sparkConfig.getFiles()); + addSparkArg(sparkLauncher, "--archives", sparkConfig.getArchives()); + addSparkArg(sparkLauncher, "--driver-memory", sparkConfig.getDriverMemory()); + addSparkArg(sparkLauncher, "--driver-java-options", sparkConfig.getDriverJavaOptions()); + addSparkArg(sparkLauncher, "--driver-library-path", sparkConfig.getDriverLibraryPath()); + addSparkArg(sparkLauncher, "--driver-class-path", sparkConfig.getDriverClassPath()); + addSparkArg(sparkLauncher, "--executor-memory", sparkConfig.getExecutorMemory()); + addSparkArg(sparkLauncher, "--proxy-user", sparkConfig.getProxyUser()); + addSparkArg(sparkLauncher, "--driver-cores", sparkConfig.getDriverCores().toString()); + addSparkArg(sparkLauncher, "--total-executor-cores", sparkConfig.getTotalExecutorCores()); + addSparkArg(sparkLauncher, "--executor-cores", sparkConfig.getExecutorCores().toString()); + addSparkArg(sparkLauncher, "--num-executors", sparkConfig.getNumExecutors().toString()); + addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal()); + addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab()); + sparkLauncher.setAppResource(sparkConfig.getAppResource()); + sparkLauncher.setMainClass(mainClass); + Arrays.stream(args.split("\\s+")) + .filter(StringUtils::isNotBlank) + .forEach(arg -> sparkLauncher.addAppArgs(arg)); + sparkAppHandle = + sparkLauncher.startApplication( + new SparkAppHandle.Listener() { + @Override + public void stateChanged(SparkAppHandle sparkAppHandle) {} + + @Override + public void infoChanged(SparkAppHandle sparkAppHandle) {} + }); + sparkLauncher.setSparkAppHandle(sparkAppHandle); + } + + private void addSparkArg(SparkLauncher sparkLauncher, String key, String value) { + if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) { + sparkLauncher.addSparkArg(key, value); + } + } + + private void setConf(SparkLauncher sparkLauncher, String key, String value) { + if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) { + sparkLauncher.setConf(key, value); + } + } + + public boolean initJobId() { + Pod sparkDriverPod = getSparkDriverPod(); + if (null == sparkDriverPod) { + return false; + } + String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase(); + String sparkApplicationId = sparkDriverPod.getMetadata().getLabels().get("spark-app-selector"); + + if (Strings.isNotBlank(sparkApplicationId)) { + this.applicationId = sparkApplicationId; + } + if (Strings.isNotBlank(sparkDriverPodPhase)) { + this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase); + } + + // When the job is not finished, the appId is monitored; otherwise, the status is + // monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待) + return null != getApplicationId() || (jobState != null && jobState.isFinal()); + } + + protected Pod getSparkDriverPod() { + return client.pods().inNamespace(namespace).withName(driverPodName).get(); + } + + public String getSparkDriverPodIP() { + Pod sparkDriverPod = getSparkDriverPod(); + if (null != sparkDriverPod) { + String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP(); + if (StringUtils.isNotBlank(sparkDriverPodIP)) { + return sparkDriverPodIP; + } else { + logger.info("spark driver pod IP is null, the application may be pending"); + } + } else { + logger.info("spark driver pod is not exist"); + } + return ""; + } + + @Override + public SparkAppHandle.State getJobState() { + Pod sparkDriverPod = getSparkDriverPod(); + if (null != sparkDriverPod) { + String sparkDriverPodPhase = sparkDriverPod.getStatus().getPhase(); + this.jobState = kubernetesPodStateConvertSparkState(sparkDriverPodPhase); + logger.info("Job {} state is {}.", getApplicationId(), this.jobState); + return this.jobState; + } + return null; + } + + @Override + public void close() { + logger.info("Start to close job {}.", getApplicationId()); + PodResource sparkDriverPodResource = + client.pods().inNamespace(namespace).withName(driverPodName); + if (null != sparkDriverPodResource.get()) { + sparkDriverPodResource.delete(); + } + client.close(); + } + + @Override + public boolean isDisposed() { + return this.jobState.isFinal(); + } + + public SparkAppHandle.State kubernetesPodStateConvertSparkState(String kubernetesState) { + if (StringUtils.isBlank(kubernetesState)) { + return SparkAppHandle.State.UNKNOWN; + } + switch (kubernetesState.toUpperCase()) { + case "PENDING": + return SparkAppHandle.State.CONNECTED; + case "RUNNING": + return SparkAppHandle.State.RUNNING; + case "SUCCEEDED": + return SparkAppHandle.State.FINISHED; + case "FAILED": + return SparkAppHandle.State.FAILED; + default: + return SparkAppHandle.State.UNKNOWN; + } + } + + public String generateDriverPodName(String appName) { + return appName + "-" + UUID.randomUUID().toString().replace("-", "") + "-driver"; + } +} diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala new file mode 100644 index 0000000000..1c3873942d --- /dev/null +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.spark.executor + +import org.apache.linkis.common.utils.{ByteTimeUtils, Utils} +import org.apache.linkis.engineconn.once.executor.{ + OnceExecutorExecutionContext, + OperableOnceExecutor +} +import org.apache.linkis.engineplugin.spark.client.deployment.{ + KubernetesApplicationClusterDescriptorAdapter, + YarnApplicationClusterDescriptorAdapter +} +import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ + SPARK_APP_CONF, + SPARK_APPLICATION_ARGS, + SPARK_APPLICATION_MAIN_CLASS +} +import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext +import org.apache.linkis.engineplugin.spark.utils.SparkJobProgressUtil +import org.apache.linkis.manager.common.entity.resource._ +import org.apache.linkis.manager.common.utils.ResourceUtils +import org.apache.linkis.protocol.engine.JobProgressInfo + +import org.apache.commons.lang3.StringUtils + +import java.util + +import scala.concurrent.duration.Duration + +import io.fabric8.kubernetes.api.model.Quantity + +class SparkOnKubernetesSubmitOnceExecutor( + override val id: Long, + override protected val sparkEngineConnContext: SparkEngineConnContext +) extends SparkOnceExecutor[KubernetesApplicationClusterDescriptorAdapter] + with OperableOnceExecutor { + + private var oldProgress: Float = 0f + + override def doSubmit( + onceExecutorExecutionContext: OnceExecutorExecutionContext, + options: Map[String, String] + ): Unit = { + val args = SPARK_APPLICATION_ARGS.getValue(options) + val mainClass = SPARK_APPLICATION_MAIN_CLASS.getValue(options) + val extConf = SPARK_APP_CONF.getValue(options) + val confMap = new util.HashMap[String, String]() + if (StringUtils.isNotBlank(extConf)) { + for (conf <- extConf.split("\n")) { + if (StringUtils.isNotBlank(conf)) { + val pair = conf.trim.split("=") + if (pair.length == 2) { + confMap.put(pair(0), pair(1)) + } else { + logger.warn(s"ignore spark conf: $conf") + } + } + } + } + logger.info( + s"Ready to submit spark application to kubernetes, mainClass: $mainClass, args: $args." + ) + clusterDescriptorAdapter.deployCluster(mainClass, args, confMap) + } + + override protected def waitToRunning(): Unit = { + // Wait until the task return applicationId (等待返回applicationId) + Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf) + // Synchronize applicationId to EC SparkOnceExecutor to facilitate user operations, + // such as obtaining progress and killing jobs(将applicationId同步给EC执行器,方便用户操作,如获取进度,kill任务等) + setApplicationId(clusterDescriptorAdapter.getApplicationId) + super.waitToRunning() + } + + override def getApplicationURL: String = "" + + override def getCurrentNodeResource(): NodeResource = { + logger.info("Begin to get actual used resources!") + Utils.tryCatch({ + val sparkConf = sparkEngineConnContext.getExecutionContext.getSparkConfig + val sparkNamespace = sparkConf.getK8sNamespace + + val executorNum: Int = sparkConf.getNumExecutors + val executorMem: Long = + ByteTimeUtils.byteStringAsBytes(sparkConf.getExecutorMemory) * executorNum + val driverMem: Long = ByteTimeUtils.byteStringAsBytes(sparkConf.getDriverMemory) + + val executorCoresQuantity = Quantity.parse(sparkConf.getK8sExecutorRequestCores) + val executorCores: Long = + (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 1000).toLong * executorNum + val driverCoresQuantity = Quantity.parse(sparkConf.getK8sDriverRequestCores) + val driverCores: Long = + (Quantity.getAmountInBytes(driverCoresQuantity).doubleValue() * 1000).toLong + + logger.info( + "Current actual used resources is driverMem:" + driverMem + ",driverCores:" + driverCores + ",executorMem:" + executorMem + ",executorCores:" + executorCores + ",namespace:" + sparkNamespace + ) + val usedResource = new DriverAndKubernetesResource( + new LoadInstanceResource(0, 0, 0), + new KubernetesResource(executorMem + driverMem, executorCores + driverCores, sparkNamespace) + ) + val nodeResource = new CommonNodeResource + nodeResource.setUsedResource(usedResource) + nodeResource.setResourceType(ResourceUtils.getResourceTypeByResource(usedResource)) + nodeResource + })(t => { + logger.warn("Get actual used resource exception", t) + null + }) + } + + override def getProgress: Float = { + val jobIsFinal = clusterDescriptorAdapter != null && + clusterDescriptorAdapter.getJobState != null && + clusterDescriptorAdapter.getJobState.isFinal + if (oldProgress >= 1 || jobIsFinal) { + 1 + } else { + val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP + if (StringUtils.isNotBlank(sparkDriverPodIP)) { + val newProgress = SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP) + if (newProgress > oldProgress) { + oldProgress = newProgress + } + } + oldProgress + } + } + + override def getProgressInfo: Array[JobProgressInfo] = { + val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP + if (StringUtils.isNotBlank(sparkDriverPodIP)) { + SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, sparkDriverPodIP) + } else { + Array.empty + } + } + + override def getMetrics: util.Map[String, Any] = { + new util.HashMap[String, Any]() + } + + override def getDiagnosis: util.Map[String, Any] = { + new util.HashMap[String, Any]() + } + +} diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index bc18e2badf..e8f2cd22d3 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -111,6 +111,8 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options)) sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options)) sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options)) + sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options)) + sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options)) } sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options)) sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options)) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala index 922826c2ab..640476a589 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala @@ -113,7 +113,9 @@ class SparkEngineConnResourceFactory extends AbstractEngineResourceFactory with Quantity.parse(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(properties)) (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 1000).toLong } else { - LINKIS_SPARK_EXECUTOR_CORES.getValue(properties) * 1000L + val sparkDefaultExecutorCores: Int = LINKIS_SPARK_EXECUTOR_CORES.getValue(properties) + properties.put(SPARK_K8S_EXECUTOR_REQUEST_CORES.key, sparkDefaultExecutorCores.toString) + sparkDefaultExecutorCores * 1000L } val executorMemory = LINKIS_SPARK_EXECUTOR_MEMORY.getValue(properties) val executorMemoryWithUnit = if (StringUtils.isNumeric(executorMemory)) { @@ -126,7 +128,9 @@ class SparkEngineConnResourceFactory extends AbstractEngineResourceFactory with Quantity.parse(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(properties)) (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() * 1000).toLong } else { - LINKIS_SPARK_DRIVER_CORES.getValue(properties) * 1000L + val sparkDefaultDriverCores: Int = LINKIS_SPARK_DRIVER_CORES.getValue(properties) + properties.put(SPARK_K8S_DRIVER_REQUEST_CORES.key, sparkDefaultDriverCores.toString) + sparkDefaultDriverCores * 1000L } val driverMemory = LINKIS_SPARK_DRIVER_MEMORY.getValue(properties) val driverMemoryWithUnit = if (StringUtils.isNumeric(driverMemory)) { diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala index 25e2649441..12a87e22f9 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala @@ -22,10 +22,16 @@ import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.once.executor.OnceExecutor import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext -import org.apache.linkis.engineplugin.spark.executor.SparkSubmitOnceExecutor +import org.apache.linkis.engineplugin.spark.executor.{ + SparkOnKubernetesSubmitOnceExecutor, + SparkSubmitOnceExecutor +} +import org.apache.linkis.manager.common.conf.RMConfiguration.DEFAULT_KUBERNETES_TYPE import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.cluster.ClusterLabel import org.apache.linkis.manager.label.entity.engine.RunType import org.apache.linkis.manager.label.entity.engine.RunType.RunType +import org.apache.linkis.manager.label.utils.LabelUtil class SparkOnceExecutorFactory extends OnceExecutorFactory { @@ -34,11 +40,21 @@ class SparkOnceExecutorFactory extends OnceExecutorFactory { engineCreationContext: EngineCreationContext, engineConn: EngineConn, labels: Array[Label[_]] - ): OnceExecutor = + ): OnceExecutor = { + val clusterLabel = LabelUtil.getLabelFromArray[ClusterLabel](labels) engineConn.getEngineConnSession match { case context: SparkEngineConnContext => - new SparkSubmitOnceExecutor(id, context) + if ( + null != clusterLabel && clusterLabel.getClusterType.equalsIgnoreCase( + DEFAULT_KUBERNETES_TYPE.getValue + ) + ) { + new SparkOnKubernetesSubmitOnceExecutor(id, context) + } else { + new SparkSubmitOnceExecutor(id, context) + } } + } override protected def getRunType: RunType = RunType.JAR } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala index 196414420a..6968ffb61f 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala @@ -27,11 +27,15 @@ import org.apache.http.client.methods.HttpGet import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils +import java.util + object SparkJobProgressUtil extends Logging { - def getProgress(applicationId: String): Float = { + def getProgress(applicationId: String, podIP: String = ""): Float = { if (StringUtils.isBlank(applicationId)) return 0f - val sparkJobsResult = getSparkJobInfo(applicationId) + val sparkJobsResult = + if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId) + else getKubernetesSparkJobInfo(applicationId, podIP) if (sparkJobsResult.isEmpty) return 0f val tuple = sparkJobsResult .filter(sparkJobResult => { @@ -48,8 +52,10 @@ object SparkJobProgressUtil extends Logging { tuple._2.toFloat / tuple._1 } - def getSparkJobProgressInfo(applicationId: String): Array[JobProgressInfo] = { - val sparkJobsResult = getSparkJobInfo(applicationId) + def getSparkJobProgressInfo(applicationId: String, podIP: String = ""): Array[JobProgressInfo] = { + val sparkJobsResult = + if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId) + else getKubernetesSparkJobInfo(applicationId, podIP) if (sparkJobsResult.isEmpty) { Array.empty } else { @@ -96,6 +102,37 @@ object SparkJobProgressUtil extends Logging { ) } + def getKubernetesSparkJobInfo( + applicationId: String, + podIP: String + ): Array[java.util.Map[String, Object]] = + if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) Array.empty + else { + val getSparkJobsStateUrl = s"http://$podIP:4040/api/v1/applications/$applicationId" + logger.info(s"get spark job state from kubernetes spark ui, url: $getSparkJobsStateUrl") + val appStateResult = + JsonUtils.jackson.readValue( + get(getSparkJobsStateUrl), + classOf[java.util.Map[String, Object]] + ) + val appAttemptList = appStateResult.get("attempts").asInstanceOf[java.util.List[Object]] + if (appAttemptList == null || appAttemptList.size() == 0) return Array.empty + val appLastAttempt = + appAttemptList.get(appAttemptList.size() - 1).asInstanceOf[util.Map[String, Object]] + val isLastAttemptCompleted = appLastAttempt.get("completed").asInstanceOf[Boolean] + if (isLastAttemptCompleted) return Array.empty + val getSparkJobsInfoUrl = s"http://$podIP:4040/api/v1/applications/$applicationId/jobs" + logger.info(s"get spark job info from kubernetes spark ui: $getSparkJobsInfoUrl") + val jobs = get(getSparkJobsInfoUrl) + if (StringUtils.isBlank(jobs)) { + return Array.empty + } + JsonUtils.jackson.readValue( + get(getSparkJobsInfoUrl), + classOf[Array[java.util.Map[String, Object]]] + ) + } + def get(url: String): String = { val httpGet = new HttpGet(url) val client = HttpClients.createDefault