Skip to content

Commit

Permalink
feat: optimize process of closing spark job on k8s (#4957)
Browse files Browse the repository at this point in the history
  • Loading branch information
lenoxzhao authored Nov 8, 2023
1 parent 7b99579 commit 5170426
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

public class KubernetesApplicationClusterDescriptorAdapter extends ClusterDescriptorAdapter {
private static final Logger logger =
LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);
LoggerFactory.getLogger(KubernetesApplicationClusterDescriptorAdapter.class);

protected SparkConfig sparkConfig;
protected KubernetesClient client;
Expand All @@ -66,7 +66,7 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
.setJavaHome(sparkConfig.getJavaHome())
.setSparkHome(sparkConfig.getSparkHome())
.setMaster(sparkConfig.getK8sMasterUrl())
.setDeployMode(sparkConfig.getDeployMode())
.setDeployMode("cluster")
.setAppName(sparkConfig.getAppName())
.setVerbose(true);
this.driverPodName = generateDriverPodName(sparkConfig.getAppName());
Expand Down Expand Up @@ -196,12 +196,16 @@ public SparkAppHandle.State getJobState() {
@Override
public void close() {
logger.info("Start to close job {}.", getApplicationId());
client.close();
if (isDisposed()) {
logger.info("Job has finished, close action return.");
return;
}
PodResource<Pod> sparkDriverPodResource =
client.pods().inNamespace(namespace).withName(driverPodName);
if (null != sparkDriverPodResource.get()) {
sparkDriverPodResource.delete();
}
client.close();
}

@Override
Expand Down

0 comments on commit 5170426

Please sign in to comment.