Skip to content

Commit

Permalink
Add the spark k8s operator log
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Jul 24, 2023
1 parent 1887d51 commit 887c1aa
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesOperatorClusterDescriptorAdapter extends ClusterDescriptorAdapter {
private static final Logger logger =
LoggerFactory.getLogger(KubernetesOperatorClusterDescriptorAdapter.class);

protected SparkConfig sparkConfig;
protected KubernetesClient client;
Expand All @@ -57,6 +61,10 @@ public KubernetesOperatorClusterDescriptorAdapter(ExecutionContext executionCont

public void deployCluster(String mainClass, String args, Map<String, String> confMap) {

logger.info(
"The spark k8s operator task start,k8sNamespace: {},appName: {}",
this.sparkConfig.getK8sNamespace(),
this.sparkConfig.getAppName());
CustomResourceDefinitionList crds =
client.apiextensions().v1().customResourceDefinitions().list();

Expand Down Expand Up @@ -101,8 +109,10 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
.executor(executor)
.build();

logger.info("Spark k8s operator task parameters: {}", sparkApplicationSpec);
sparkApplication.setSpec(sparkApplicationSpec);
SparkApplication created = sparkApplicationClient.createOrReplace(sparkApplication);
logger.info("Preparing to submit the Spark k8s operator Task: {}", created);

// Wait three seconds to get the status
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,41 @@ public void setExecutor(SparkPodSpec executor) {
this.executor = executor;
}

@Override
public String toString() {
return "SparkApplicationSpec{"
+ "type='"
+ type
+ '\''
+ ", mode='"
+ mode
+ '\''
+ ", image='"
+ image
+ '\''
+ ", imagePullPolicy='"
+ imagePullPolicy
+ '\''
+ ", mainClass='"
+ mainClass
+ '\''
+ ", mainApplicationFile='"
+ mainApplicationFile
+ '\''
+ ", sparkVersion='"
+ sparkVersion
+ '\''
+ ", restartPolicy="
+ restartPolicy
+ ", volumes="
+ volumes
+ ", driver="
+ driver
+ ", executor="
+ executor
+ '}';
}

public static SparkApplicationSpecBuilder Builder() {
return new SparkApplicationSpecBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.io.IOException;

import io.fabric8.kubernetes.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesHelper {
private static final Logger logger = LoggerFactory.getLogger(KubernetesHelper.class);

public static KubernetesClient getKubernetesClientByUrl(
String k8sMasterUrl, String k8sUsername, String k8sPassword) {
Expand All @@ -34,16 +37,33 @@ public static KubernetesClient getKubernetesClientByUrl(
.withUsername(k8sUsername)
.withPassword(k8sPassword)
.build();
return new DefaultKubernetesClient(config);
DefaultKubernetesClient kubernetesClient = new DefaultKubernetesClient(config);
logger.info(
"KubernetesClient Create success,kubernetesClient masterUrl: {}",
kubernetesClient.getMasterUrl().toString());
return kubernetesClient;
}

public static KubernetesClient getKubernetesClientByUrl(String k8sMasterUrl) {
Config config = new ConfigBuilder().withMasterUrl(k8sMasterUrl).build();
return new DefaultKubernetesClient(config);
DefaultKubernetesClient kubernetesClient = new DefaultKubernetesClient(config);
logger.info(
"KubernetesClient Create success,kubernetesClient masterUrl: {}",
kubernetesClient.getMasterUrl().toString());
return kubernetesClient;
}

public static KubernetesClient getKubernetesClient(
String kubeConfigFile, String k8sMasterUrl, String k8sUsername, String k8sPassword) {
logger.info(
"Start create KubernetesClient,kubeConfigFile: {},k8sMasterUrl: {}",
kubeConfigFile,
k8sMasterUrl);

if (StringUtils.isBlank(kubeConfigFile) && StringUtils.isBlank(kubeConfigFile)) {
throw new KubernetesClientException(
"Both kubeConfigFile and k8sMasterUrl are empty. Initializing KubernetesClient failed.");
}
// The ConfigFile mode is preferred
if (StringUtils.isNotBlank(kubeConfigFile)) {
return getKubernetesClientByKubeConfigFile(kubeConfigFile);
Expand All @@ -59,8 +79,7 @@ public static KubernetesClient getKubernetesClient(
return getKubernetesClientByUrl(k8sMasterUrl);
}

throw new KubernetesClientException(
"Both kubeConfigFile and k8sMasterUrl are empty. Initializing KubernetesClient failed.");
throw new KubernetesClientException("Initializing KubernetesClient failed.");
}

public static KubernetesClient getKubernetesClientByKubeConfigFile(String kubeConfigFile) {
Expand All @@ -77,6 +96,10 @@ public static KubernetesClient getKubernetesClientByKubeConfigFile(String kubeCo
config = Config.autoConfigure(null);
}

return new DefaultKubernetesClient(config);
DefaultKubernetesClient kubernetesClient = new DefaultKubernetesClient(config);
logger.info(
"KubernetesClient Create success,kubernetesClient masterUrl: {}",
kubernetesClient.getMasterUrl().toString());
return kubernetesClient;
}
}

0 comments on commit 887c1aa

Please sign in to comment.