Skip to content

Commit

Permalink
feat: merge podIP and port into url
Browse files Browse the repository at this point in the history
  • Loading branch information
lenoxzhao committed Sep 20, 2023
1 parent 70112a8 commit 44af15b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ protected Pod getSparkDriverPod() {
return client.pods().inNamespace(namespace).withName(driverPodName).get();
}

public String getSparkDriverPodIP() {
public String getSparkUIUrl() {
Pod sparkDriverPod = getSparkDriverPod();
if (null != sparkDriverPod) {
String sparkDriverPodIP = sparkDriverPod.getStatus().getPodIP();
if (StringUtils.isNotBlank(sparkDriverPodIP)) {
return sparkDriverPodIP;
return sparkDriverPodIP + ":" + this.sparkConfig.getK8sSparkUIPort();
} else {
logger.info("spark driver pod IP is null, the application may be pending");
}
Expand All @@ -181,10 +181,6 @@ public String getSparkDriverPodIP() {
return "";
}

public String getSparkUIPort() {
return this.sparkConfig.getK8sSparkUIPort();
}

@Override
public SparkAppHandle.State getJobState() {
Pod sparkDriverPod = getSparkDriverPod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,10 @@ class SparkOnKubernetesSubmitOnceExecutor(
if (oldProgress >= 1 || jobIsFinal) {
1
} else {
val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
val sparkUIPort = this.clusterDescriptorAdapter.getSparkUIPort
if (StringUtils.isNotBlank(sparkDriverPodIP)) {
val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl
if (StringUtils.isNotBlank(sparkUIUrl)) {
val newProgress =
SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP, sparkUIPort)
SparkJobProgressUtil.getProgress(this.getApplicationId, sparkUIUrl)
if (newProgress > oldProgress) {
oldProgress = newProgress
}
Expand All @@ -146,9 +145,9 @@ class SparkOnKubernetesSubmitOnceExecutor(
}

override def getProgressInfo: Array[JobProgressInfo] = {
val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
if (StringUtils.isNotBlank(sparkDriverPodIP)) {
SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, sparkDriverPodIP)
val sparkUIUrl = this.clusterDescriptorAdapter.getSparkUIUrl
if (StringUtils.isNotBlank(sparkUIUrl)) {
SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId, sparkUIUrl)
} else {
Array.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import java.util

object SparkJobProgressUtil extends Logging {

def getProgress(applicationId: String, podIP: String = "", sparkUIPort: String = ""): Float = {
def getProgress(applicationId: String, sparkUIUrl: String = ""): Float = {
if (StringUtils.isBlank(applicationId)) return 0f
val sparkJobsResult =
if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort))
if (StringUtils.isBlank(sparkUIUrl))
getSparkJobInfo(applicationId)
else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort)
else getKubernetesSparkJobInfo(applicationId, sparkUIUrl)
if (sparkJobsResult.isEmpty) return 0f
val tuple = sparkJobsResult
.filter(sparkJobResult => {
Expand All @@ -55,13 +55,12 @@ object SparkJobProgressUtil extends Logging {

def getSparkJobProgressInfo(
applicationId: String,
podIP: String = "",
sparkUIPort: String = ""
sparkUIUrl: String = ""
): Array[JobProgressInfo] = {
val sparkJobsResult =
if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort))
if (StringUtils.isBlank(sparkUIUrl))
getSparkJobInfo(applicationId)
else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort)
else getKubernetesSparkJobInfo(applicationId, sparkUIUrl)
if (sparkJobsResult.isEmpty) {
Array.empty
} else {
Expand Down Expand Up @@ -110,12 +109,11 @@ object SparkJobProgressUtil extends Logging {

def getKubernetesSparkJobInfo(
applicationId: String,
podIP: String,
sparkUIPort: String
sparkUIUrl: String
): Array[java.util.Map[String, Object]] =
if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) Array.empty
if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(sparkUIUrl)) Array.empty
else {
val getSparkJobsStateUrl = s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId"
val getSparkJobsStateUrl = s"http://$sparkUIUrl/api/v1/applications/$applicationId"
logger.info(s"get spark job state from kubernetes spark ui, url: $getSparkJobsStateUrl")
val appStateResult =
JsonUtils.jackson.readValue(
Expand All @@ -129,7 +127,7 @@ object SparkJobProgressUtil extends Logging {
val isLastAttemptCompleted = appLastAttempt.get("completed").asInstanceOf[Boolean]
if (isLastAttemptCompleted) return Array.empty
val getSparkJobsInfoUrl =
s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId/jobs"
s"http://$sparkUIUrl/api/v1/applications/$applicationId/jobs"
logger.info(s"get spark job info from kubernetes spark ui: $getSparkJobsInfoUrl")
val jobs = get(getSparkJobsInfoUrl)
if (StringUtils.isBlank(jobs)) {
Expand Down

0 comments on commit 44af15b

Please sign in to comment.