diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java index 96e8e55323..ce709b2e7a 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java @@ -166,12 +166,12 @@ protected Pod getSparkDriverPod() { return client.pods().inNamespace(namespace).withName(driverPodName).get(); } - public String getSparkDriverPodIP() { + public String getSparkUIUrl() { Pod sparkDriverPod = getSparkDriverPod(); if (null != sparkDriverPod) { String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP(); if (StringUtils.isNotBlank(sparkDriverPodIP)) { - return sparkDriverPodIP; + return sparkDriverPodIP + ":" + this.sparkConfig.getK8sSparkUIPort(); } else { logger.info("spark driver pod IP is null, the application may be pending"); } @@ -181,10 +181,6 @@ public String getSparkDriverPodIP() { return ""; } - public String getSparkUIPort() { - return this.sparkConfig.getK8sSparkUIPort(); - } - @Override public SparkAppHandle.State getJobState() { Pod sparkDriverPod = getSparkDriverPod(); diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala index f9ec714d90..e8b7dfb489 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnKubernetesSubmitOnceExecutor.scala @@ -132,11 +132,10 @@ class SparkOnKubernetesSubmitOnceExecutor( if (oldProgress >= 1 || jobIsFinal) { 1 } else { - val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP - val sparkUIPort = this.clusterDescriptorAdapter.getSparkUIPort - if (StringUtils.isNotBlank(sparkDriverPodIP)) { + val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl + if (StringUtils.isNotBlank(sparkUIUrl)) { val newProgress = - SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP, sparkUIPort) + SparkJobProgressUtil.getProgress(this.getApplicationId, sparkUIUrl) if (newProgress > oldProgress) { oldProgress = newProgress } @@ -146,9 +145,9 @@ class SparkOnKubernetesSubmitOnceExecutor( } override def getProgressInfo: Array[JobProgressInfo] = { - val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP - if (StringUtils.isNotBlank(sparkDriverPodIP)) { - SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, sparkDriverPodIP) + val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl + if (StringUtils.isNotBlank(sparkUIUrl)) { + SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, sparkUIUrl) } else { Array.empty } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala index e1a5d079c9..94614f902b 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala @@ -31,12 +31,12 @@ import java.util object SparkJobProgressUtil extends Logging { - def getProgress(applicationId: String, podIP: String = "", sparkUIPort: String = ""): Float = { + def getProgress(applicationId: String, sparkUIUrl: String = ""): Float = { if (StringUtils.isBlank(applicationId)) return 0f val sparkJobsResult = - if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort)) + if (StringUtils.isBlank(sparkUIUrl)) getSparkJobInfo(applicationId) - else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort) + else getKubernetesSparkJobInfo(applicationId, sparkUIUrl) if (sparkJobsResult.isEmpty) return 0f val tuple = sparkJobsResult .filter(sparkJobResult => { @@ -55,13 +55,12 @@ object SparkJobProgressUtil extends Logging { def getSparkJobProgressInfo( applicationId: String, - podIP: String = "", - sparkUIPort: String = "" + sparkUIUrl: String = "" ): Array[JobProgressInfo] = { val sparkJobsResult = - if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort)) + if (StringUtils.isBlank(sparkUIUrl)) getSparkJobInfo(applicationId) - else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort) + else getKubernetesSparkJobInfo(applicationId, sparkUIUrl) if (sparkJobsResult.isEmpty) { Array.empty } else { @@ -110,12 +109,11 @@ object SparkJobProgressUtil extends Logging { def getKubernetesSparkJobInfo( applicationId: String, - podIP: String, - sparkUIPort: String + sparkUIUrl: String ): Array[java.util.Map[String, Object]] = - if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) Array.empty + if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(sparkUIUrl)) Array.empty else { - val getSparkJobsStateUrl = s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId" + val getSparkJobsStateUrl = s"http://$sparkUIUrl/api/v1/applications/$applicationId" logger.info(s"get spark job state from kubernetes spark ui, url: $getSparkJobsStateUrl") val appStateResult = JsonUtils.jackson.readValue( @@ -129,7 +127,7 @@ object SparkJobProgressUtil extends Logging { val isLastAttemptCompleted = appLastAttempt.get("completed").asInstanceOf[Boolean] if (isLastAttemptCompleted) return Array.empty val getSparkJobsInfoUrl = - s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId/jobs" + s"http://$sparkUIUrl/api/v1/applications/$applicationId/jobs" logger.info(s"get spark job info from kubernetes spark ui: $getSparkJobsInfoUrl") val jobs = get(getSparkJobsInfoUrl) if (StringUtils.isBlank(jobs)) {