From cbb4a44ecf402ff1201d1a66e88379462b119648 Mon Sep 17 00:00:00 2001 From: itsujin <1125432361@qq.com> Date: Sun, 5 Nov 2023 16:11:25 +0800 Subject: [PATCH] feat: optimize process of closing spark job on k8s --- .../KubernetesApplicationClusterDescriptorAdapter.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java index ce709b2e7a..73892117ad 100644 --- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesApplicationClusterDescriptorAdapter.java @@ -40,7 +40,7 @@ public class KubernetesApplicationClusterDescriptorAdapter extends ClusterDescriptorAdapter { private static final Logger logger = - LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class); + LoggerFactory.getLogger(KubernetesApplicationClusterDescriptorAdapter.class); protected SparkConfig sparkConfig; protected KubernetesClient client; @@ -66,7 +66,7 @@ public void deployCluster(String mainClass, String args, Map con .setJavaHome(sparkConfig.getJavaHome()) .setSparkHome(sparkConfig.getSparkHome()) .setMaster(sparkConfig.getK8sMasterUrl()) - .setDeployMode(sparkConfig.getDeployMode()) + .setDeployMode("cluster") .setAppName(sparkConfig.getAppName()) .setVerbose(true); this.driverPodName = generateDriverPodName(sparkConfig.getAppName()); @@ -196,12 +196,16 @@ public SparkAppHandle.State getJobState() { @Override public void close() { logger.info("Start to close job {}.", getApplicationId()); + client.close(); + if (isDisposed()) { + logger.info("Job has finished, close action return."); + return; + } PodResource sparkDriverPodResource = client.pods().inNamespace(namespace).withName(driverPodName); if (null != sparkDriverPodResource.get()) { sparkDriverPodResource.delete(); } - client.close(); } @Override