Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark k8s operator task Added status acquisition #4889

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,32 +155,44 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
}

public boolean initJobId() {
SparkApplicationStatus sparkApplicationStatus = getKubernetesOperatorState();

if (Objects.nonNull(sparkApplicationStatus)) {
this.applicationId = sparkApplicationStatus.getSparkApplicationId();
this.jobState =
kubernetesOperatorStateConvertSparkState(
sparkApplicationStatus.getApplicationState().getState());
try {
getKubernetesOperatorState();
} catch (Exception e) {
try {
// Prevent watch interruption due to network interruption.Restart Watcher.
Thread.sleep(5000);
getKubernetesOperatorState();
} catch (InterruptedException interruptedException) {
logger.error("Use k8s watch obtain the status failed");
}
}

// When the job is not finished, the appId is monitored; otherwise, the status is
// monitored(当任务没结束时,监控appId,反之,则监控状态,这里主要防止任务过早结束,导致一直等待)
return null != getApplicationId() || (jobState != null && jobState.isFinal());
}

private SparkApplicationStatus getKubernetesOperatorState() {
List<SparkApplication> sparkApplicationList =
getSparkApplicationClient(client).list().getItems();
if (CollectionUtils.isNotEmpty(sparkApplicationList)) {
for (SparkApplication sparkApplication : sparkApplicationList) {
if (sparkApplication.getMetadata().getNamespace().equals(this.sparkConfig.getK8sNamespace())
&& sparkApplication.getMetadata().getName().equals(this.sparkConfig.getAppName())) {
return sparkApplication.getStatus();
}
}
}
return null;
private void getKubernetesOperatorState() {
getSparkApplicationClient(client)
.inNamespace(this.sparkConfig.getK8sNamespace())
.withName(this.sparkConfig.getAppName())
.watch(
new Watcher<SparkApplication>() {
@Override
public void eventReceived(Action action, SparkApplication sparkApplication) {
// todo get status
applicationId = sparkApplication.getStatus().getSparkApplicationId();
jobState =
kubernetesOperatorStateConvertSparkState(
sparkApplication.getStatus().getApplicationState().getState());
}

@Override
public void onClose(WatcherException e) {
// Invoked when the watcher closes due to an Exception.Restart Watcher.
logger.error("Use k8s watch obtain the status failed", e);
getKubernetesOperatorState();
}
});
}

public SparkAppHandle.State kubernetesOperatorStateConvertSparkState(String kubernetesState) {
Expand Down Expand Up @@ -216,8 +231,7 @@ public void close() {
client.close();
}

public static NonNamespaceOperation<
SparkApplication, SparkApplicationList, Resource<SparkApplication>>
public static MixedOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>>
getSparkApplicationClient(KubernetesClient client) {
return client.customResources(SparkApplication.class, SparkApplicationList.class);
}
Expand Down
Loading