From 186acc0b60df9e08159b12b6a36b167d92b1d76a Mon Sep 17 00:00:00 2001 From: itsujin <1125432361@qq.com> Date: Wed, 20 Sep 2023 13:44:49 +0800 Subject: [PATCH] feat: add Spark UI port configuration for spark on k8s once job --- .../spark/client/context/SparkConfig.java | 12 +++++++++ ...esApplicationClusterDescriptorAdapter.java | 5 ++++ .../spark/config/SparkConfiguration.scala | 1 + .../SparkOnKubernetesSubmitOnceExecutor.scala | 4 ++- .../factory/SparkEngineConnFactory.scala | 1 + .../spark/utils/SparkJobProgressUtil.scala | 26 ++++++++++++------- 6 files changed, 39 insertions(+), 10 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java index 1ab816e9b3..1768b77d04 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java @@ -53,6 +53,7 @@ public class SparkConfig { private String k8sDriverRequestCores; private String k8sExecutorRequestCores; + private String k8sSparkUIPort; private String deployMode = "client"; // ("client") // todo cluster private String appResource; // ("") private String appName; // ("") @@ -196,6 +197,14 @@ public void setK8sExecutorRequestCores(String k8sExecutorRequestCores) { this.k8sExecutorRequestCores = k8sExecutorRequestCores; } + public String getK8sSparkUIPort() { + return k8sSparkUIPort; + } + + public void setK8sSparkUIPort(String k8sSparkUIPort) { + this.k8sSparkUIPort = k8sSparkUIPort; + } + public String getJavaHome() { return javaHome; } @@ -476,6 +485,9 @@ public String toString() { + ", k8sExecutorRequestCores='" + k8sExecutorRequestCores + '\'' + + ", k8sSparkUIPort='" + + k8sSparkUIPort + + '\'' + ", deployMode='" + deployMode + '\'' diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java index a986eaa1b2..96e8e55323 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java @@ -72,6 +72,7 @@ public void deployCluster(String mainClass, String args, Map con this.driverPodName = generateDriverPodName(sparkConfig.getAppName()); this.namespace = sparkConfig.getK8sNamespace(); setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName()); + setConf(sparkLauncher, "spark.ui.port", sparkConfig.getK8sSparkUIPort()); setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace); setConf(sparkLauncher, "spark.kubernetes.container.image", sparkConfig.getK8sImage()); setConf(sparkLauncher, "spark.kubernetes.driver.pod.name", this.driverPodName); @@ -180,6 +181,10 @@ public String getSparkDriverPodIP() { return ""; } + public String getSparkUIPort() { + return this.sparkConfig.getK8sSparkUIPort(); + } + @Override public SparkAppHandle.State getJobState() { Pod sparkDriverPod = getSparkDriverPod(); diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index 86b84eafa9..bb079b7b56 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -68,6 +68,7 @@ object SparkConfiguration extends Logging { val SPARK_K8S_RESTART_POLICY = CommonVars[String]("linkis.spark.k8s.restartPolicy", "Never") val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1") val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default") + val SPARK_K8S_UI_PORT = CommonVars[String]("linkis.spark.k8s.ui.port", "4040") val SPARK_K8S_EXECUTOR_REQUEST_CORES = CommonVars[String]("linkis.spark.k8s.executor.request.cores", "1") diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala index 1c3873942d..f9ec714d90 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala @@ -133,8 +133,10 @@ class SparkOnKubernetesSubmitOnceExecutor( 1 } else { val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP + val sparkUIPort = this.clusterDescriptorAdapter.getSparkUIPort if (StringUtils.isNotBlank(sparkDriverPodIP)) { - val newProgress = SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP) + val newProgress = + SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP, sparkUIPort) if (newProgress > oldProgress) { oldProgress = newProgress } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala index a826a14cff..8cb77a6a9e 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala @@ -113,6 +113,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options)) sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options)) sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options)) + sparkConfig.setK8sSparkUIPort(SPARK_K8S_UI_PORT.getValue(options)) } sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options)) sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options)) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala index 6968ffb61f..e1a5d079c9 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala @@ -31,11 +31,12 @@ import java.util object SparkJobProgressUtil extends Logging { - def getProgress(applicationId: String, podIP: String = ""): Float = { + def getProgress(applicationId: String, podIP: String = "", sparkUIPort: String = ""): Float = { if (StringUtils.isBlank(applicationId)) return 0f val sparkJobsResult = - if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId) - else getKubernetesSparkJobInfo(applicationId, podIP) + if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort)) + getSparkJobInfo(applicationId) + else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort) if (sparkJobsResult.isEmpty) return 0f val tuple = sparkJobsResult .filter(sparkJobResult => { @@ -52,10 +53,15 @@ object SparkJobProgressUtil extends Logging { tuple._2.toFloat / tuple._1 } - def getSparkJobProgressInfo(applicationId: String, podIP: String = ""): Array[JobProgressInfo] = { + def getSparkJobProgressInfo( + applicationId: String, + podIP: String = "", + sparkUIPort: String = "" + ): Array[JobProgressInfo] = { val sparkJobsResult = - if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId) - else getKubernetesSparkJobInfo(applicationId, podIP) + if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort)) + getSparkJobInfo(applicationId) + else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort) if (sparkJobsResult.isEmpty) { Array.empty } else { @@ -104,11 +110,12 @@ object SparkJobProgressUtil extends Logging { def getKubernetesSparkJobInfo( applicationId: String, - podIP: String + podIP: String, + sparkUIPort: String ): Array[java.util.Map[String, Object]] = if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) Array.empty else { - val getSparkJobsStateUrl = s"http://$podIP:4040/api/v1/applications/$applicationId" + val getSparkJobsStateUrl = s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId" logger.info(s"get spark job state from kubernetes spark ui, url: $getSparkJobsStateUrl") val appStateResult = JsonUtils.jackson.readValue( @@ -121,7 +128,8 @@ object SparkJobProgressUtil extends Logging { appAttemptList.get(appAttemptList.size() - 1).asInstanceOf[util.Map[String, Object]] val isLastAttemptCompleted = appLastAttempt.get("completed").asInstanceOf[Boolean] if (isLastAttemptCompleted) return Array.empty - val getSparkJobsInfoUrl = s"http://$podIP:4040/api/v1/applications/$applicationId/jobs" + val getSparkJobsInfoUrl = + s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId/jobs" logger.info(s"get spark job info from kubernetes spark ui: $getSparkJobsInfoUrl") val jobs = get(getSparkJobsInfoUrl) if (StringUtils.isBlank(jobs)) {