Skip to content

Commit

Permalink
feat: add Spark UI port configuration for spark on k8s once job
Browse files Browse the repository at this point in the history
  • Loading branch information
lenoxzhao committed Sep 20, 2023
1 parent d69f01c commit 186acc0
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SparkConfig {

private String k8sDriverRequestCores;
private String k8sExecutorRequestCores;
private String k8sSparkUIPort;
private String deployMode = "client"; // ("client") // todo cluster
private String appResource; // ("")
private String appName; // ("")
Expand Down Expand Up @@ -196,6 +197,14 @@ public void setK8sExecutorRequestCores(String k8sExecutorRequestCores) {
this.k8sExecutorRequestCores = k8sExecutorRequestCores;
}

public String getK8sSparkUIPort() {
return k8sSparkUIPort;
}

public void setK8sSparkUIPort(String k8sSparkUIPort) {
this.k8sSparkUIPort = k8sSparkUIPort;
}

public String getJavaHome() {
return javaHome;
}
Expand Down Expand Up @@ -476,6 +485,9 @@ public String toString() {
+ ", k8sExecutorRequestCores='"
+ k8sExecutorRequestCores
+ '\''
+ ", k8sSparkUIPort='"
+ k8sSparkUIPort
+ '\''
+ ", deployMode='"
+ deployMode
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
this.namespace = sparkConfig.getK8sNamespace();
setConf(sparkLauncher, "spark.app.name", sparkConfig.getAppName());
setConf(sparkLauncher, "spark.ui.port", sparkConfig.getK8sSparkUIPort());
setConf(sparkLauncher, "spark.kubernetes.namespace", this.namespace);
setConf(sparkLauncher, "spark.kubernetes.container.image", sparkConfig.getK8sImage());
setConf(sparkLauncher, "spark.kubernetes.driver.pod.name", this.driverPodName);
Expand Down Expand Up @@ -180,6 +181,10 @@ public String getSparkDriverPodIP() {
return "";
}

public String getSparkUIPort() {
return this.sparkConfig.getK8sSparkUIPort();
}

@Override
public SparkAppHandle.State getJobState() {
Pod sparkDriverPod = getSparkDriverPod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ object SparkConfiguration extends Logging {
val SPARK_K8S_RESTART_POLICY = CommonVars[String]("linkis.spark.k8s.restartPolicy", "Never")
val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default")
val SPARK_K8S_UI_PORT = CommonVars[String]("linkis.spark.k8s.ui.port", "4040")

val SPARK_K8S_EXECUTOR_REQUEST_CORES =
CommonVars[String]("linkis.spark.k8s.executor.request.cores", "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ class SparkOnKubernetesSubmitOnceExecutor(
1
} else {
val sparkDriverPodIP = this.clusterDescriptorAdapter.getSparkDriverPodIP
val sparkUIPort = this.clusterDescriptorAdapter.getSparkUIPort
if (StringUtils.isNotBlank(sparkDriverPodIP)) {
val newProgress = SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP)
val newProgress =
SparkJobProgressUtil.getProgress(this.getApplicationId, sparkDriverPodIP, sparkUIPort)
if (newProgress > oldProgress) {
oldProgress = newProgress
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
sparkConfig.setK8sImagePullPolicy(SPARK_K8S_IMAGE_PULL_POLICY.getValue(options))
sparkConfig.setK8sDriverRequestCores(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(options))
sparkConfig.setK8sExecutorRequestCores(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(options))
sparkConfig.setK8sSparkUIPort(SPARK_K8S_UI_PORT.getValue(options))
}
sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import java.util

object SparkJobProgressUtil extends Logging {

def getProgress(applicationId: String, podIP: String = ""): Float = {
def getProgress(applicationId: String, podIP: String = "", sparkUIPort: String = ""): Float = {
if (StringUtils.isBlank(applicationId)) return 0f
val sparkJobsResult =
if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
else getKubernetesSparkJobInfo(applicationId, podIP)
if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort))
getSparkJobInfo(applicationId)
else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort)
if (sparkJobsResult.isEmpty) return 0f
val tuple = sparkJobsResult
.filter(sparkJobResult => {
Expand All @@ -52,10 +53,15 @@ object SparkJobProgressUtil extends Logging {
tuple._2.toFloat / tuple._1
}

def getSparkJobProgressInfo(applicationId: String, podIP: String = ""): Array[JobProgressInfo] = {
def getSparkJobProgressInfo(
applicationId: String,
podIP: String = "",
sparkUIPort: String = ""
): Array[JobProgressInfo] = {
val sparkJobsResult =
if (StringUtils.isBlank(podIP)) getSparkJobInfo(applicationId)
else getKubernetesSparkJobInfo(applicationId, podIP)
if (StringUtils.isBlank(podIP) && StringUtils.isBlank(sparkUIPort))
getSparkJobInfo(applicationId)
else getKubernetesSparkJobInfo(applicationId, podIP, sparkUIPort)
if (sparkJobsResult.isEmpty) {
Array.empty
} else {
Expand Down Expand Up @@ -104,11 +110,12 @@ object SparkJobProgressUtil extends Logging {

def getKubernetesSparkJobInfo(
applicationId: String,
podIP: String
podIP: String,
sparkUIPort: String
): Array[java.util.Map[String, Object]] =
if (StringUtils.isBlank(applicationId) || StringUtils.isBlank(podIP)) Array.empty
else {
val getSparkJobsStateUrl = s"http://$podIP:4040/api/v1/applications/$applicationId"
val getSparkJobsStateUrl = s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId"
logger.info(s"get spark job state from kubernetes spark ui, url: $getSparkJobsStateUrl")
val appStateResult =
JsonUtils.jackson.readValue(
Expand All @@ -121,7 +128,8 @@ object SparkJobProgressUtil extends Logging {
appAttemptList.get(appAttemptList.size() - 1).asInstanceOf[util.Map[String, Object]]
val isLastAttemptCompleted = appLastAttempt.get("completed").asInstanceOf[Boolean]
if (isLastAttemptCompleted) return Array.empty
val getSparkJobsInfoUrl = s"http://$podIP:4040/api/v1/applications/$applicationId/jobs"
val getSparkJobsInfoUrl =
s"http://$podIP:$sparkUIPort/api/v1/applications/$applicationId/jobs"
logger.info(s"get spark job info from kubernetes spark ui: $getSparkJobsInfoUrl")
val jobs = get(getSparkJobsInfoUrl)
if (StringUtils.isBlank(jobs)) {
Expand Down

0 comments on commit 186acc0

Please sign in to comment.