Skip to content

Commit

Permalink
Merge pull request #23 from Claudiazhaoya/claudia_sun/upgrade-k8s-client
Browse files Browse the repository at this point in the history
Upgrade version of fabric k8s client library
  • Loading branch information
yuchaoran2011 authored Sep 25, 2023
2 parents aa3e5c8 + 6df5b5b commit 48c2c9c
Show file tree
Hide file tree
Showing 17 changed files with 260 additions and 364 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>4.13.3</version>
<version>6.5.0</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/apple/spark/api/SubmissionStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static com.apple.spark.core.SparkConstants.SUBMITTED_STATE;

import com.apple.spark.core.Constants;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationStatus;
import com.apple.spark.util.DateTimeUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
Expand All @@ -39,7 +39,7 @@ public class SubmissionStatus {
private String applicationState;
private String applicationErrorMessage;

public void copyFrom(SparkApplicationResource sparkApplicationResource) {
public void copyFrom(SparkApplication sparkApplicationResource) {
Long creationTime =
DateTimeUtils.parseOrNull(sparkApplicationResource.getMetadata().getCreationTimestamp());
this.setCreationTime(creationTime);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/apple/spark/api/SubmissionSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.apple.spark.AppConfig;
import com.apple.spark.core.Constants;
import com.apple.spark.core.SparkConstants;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationSpec;
import com.apple.spark.util.ConfigUtil;
import com.fasterxml.jackson.annotation.JsonInclude;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void setApplicationName(String applicationName) {
}

public void copyFrom(
SparkApplicationResource sparkApplicationResource,
SparkApplication sparkApplicationResource,
AppConfig.SparkCluster sparkCluster,
AppConfig appConfig) {
this.copyFrom(sparkApplicationResource);
Expand All @@ -103,7 +103,7 @@ public void copyFrom(
}

@Override
public void copyFrom(SparkApplicationResource sparkApplicationResource) {
public void copyFrom(SparkApplication sparkApplicationResource) {
super.copyFrom(sparkApplicationResource);
setSubmissionId(sparkApplicationResource.getMetadata().getName());
if (sparkApplicationResource.getMetadata().getLabels() != null) {
Expand Down
35 changes: 13 additions & 22 deletions src/main/java/com/apple/spark/core/ApplicationMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@
import com.apple.spark.AppConfig;
import com.apple.spark.operator.DriverSpec;
import com.apple.spark.operator.ExecutorSpec;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplicationResourceList;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationSpec;
import com.apple.spark.util.CounterMetricContainer;
import com.apple.spark.util.DateTimeUtils;
import com.apple.spark.util.GaugeMetricContainer;
import com.apple.spark.util.KubernetesClusterAndNamespace;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
Expand Down Expand Up @@ -157,7 +153,7 @@ public void start() {
String.format(
"%s/%s", SparkConstants.SPARK_APPLICATION_CRD_GROUP, SparkConstants.CRD_VERSION),
SparkConstants.SPARK_APPLICATION_KIND,
SparkApplicationResource.class);
SparkApplication.class);

if (appConfig.getSparkClusters() != null) {
Map<KubernetesClusterAndNamespace, AppConfig.SparkCluster> uniqueClusters = new HashMap<>();
Expand Down Expand Up @@ -195,32 +191,27 @@ private void start(AppConfig.SparkCluster sparkCluster, Timer timer) {
sparkCluster.getEksCluster(),
sparkCluster.getSparkApplicationNamespace());

DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster);
KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster);

clients.add(client);

SharedInformerFactory sharedInformerFactory = client.informers();
informerFactories.add(sharedInformerFactory);

CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext();
SharedIndexInformer<SparkApplicationResource> informer =
sharedInformerFactory.sharedIndexInformerForCustomResource(
crdContext,
SparkApplicationResource.class,
SparkApplicationResourceList.class,
new OperationContext().withNamespace(sparkCluster.getSparkApplicationNamespace()),
RESYNC_MILLIS);
SharedIndexInformer<SparkApplication> informer =
sharedInformerFactory.sharedIndexInformerFor(SparkApplication.class, RESYNC_MILLIS);

RunningApplicationMonitor runningApplicationMonitor =
new RunningApplicationMonitor(sparkCluster, timer, meterRegistry);

informer.addEventHandler(
new ResourceEventHandler<SparkApplicationResource>() {
new ResourceEventHandler<SparkApplication>() {
@Override
public void onAdd(SparkApplicationResource sparkApplicationResource) {}
public void onAdd(SparkApplication sparkApplicationResource) {}

@Override
public void onUpdate(
SparkApplicationResource prevCRDState, SparkApplicationResource newCRDState) {
SparkApplication prevCRDState, SparkApplication newCRDState) {
int timeoutMillis = 100;
try {
boolean added =
Expand All @@ -245,7 +236,7 @@ public void onUpdate(

@Override
public void onDelete(
SparkApplicationResource sparkApplicationResource,
SparkApplication sparkApplicationResource,
boolean deletedFinalStateUnknown) {}
});

Expand Down Expand Up @@ -285,8 +276,8 @@ public void close() {
*/
private void onUpdateImpl_logApplication(
AppConfig.SparkCluster sparkCluster,
SparkApplicationResource prevCRDState,
SparkApplicationResource currCRDState) {
SparkApplication prevCRDState,
SparkApplication currCRDState) {
String submissionId = currCRDState.getMetadata().getName();
String newState = SparkApplicationResourceHelper.getState(currCRDState);
String oldState = SparkApplicationResourceHelper.getState(prevCRDState);
Expand All @@ -307,7 +298,7 @@ private void onUpdateImpl_logApplication(
Timestamp startTime = null;
if (!driverInfoCollectedSubIds.contains(submissionId)
&& newState.equals(SparkConstants.RUNNING_STATE)) {
try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
String driverStartTime;
SparkApplicationSpec spec = currCRDState.getSpec();
DriverSpec driverSpec = null;
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package com.apple.spark.core;

import com.apple.spark.AppConfig;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;

/** The event generated whenever there's CRD update from an application. */
public class ApplicationUpdateEvent {

private final SparkApplicationResource prevCRDState;
private final SparkApplicationResource newCRDState;
private final SparkApplication prevCRDState;
private final SparkApplication newCRDState;

// The runningApplicationMonitor instance corresponding to the specific Spark cluster
private final RunningApplicationMonitor runningApplicationMonitor;
Expand All @@ -36,20 +36,20 @@ public class ApplicationUpdateEvent {

public ApplicationUpdateEvent(
AppConfig.SparkCluster sparkCluster,
SparkApplicationResource prevCRDState,
SparkApplicationResource newCRDState,
SparkApplication prevCRDState,
SparkApplication newCRDState,
RunningApplicationMonitor runningApplicationMonitor) {
this.prevCRDState = prevCRDState;
this.newCRDState = newCRDState;
this.sparkCluster = sparkCluster;
this.runningApplicationMonitor = runningApplicationMonitor;
}

public SparkApplicationResource getPrevCRDState() {
public SparkApplication getPrevCRDState() {
return prevCRDState;
}

public SparkApplicationResource getNewCRDState() {
public SparkApplication getNewCRDState() {
return newCRDState;
}

Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/apple/spark/core/KubernetesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@

import com.apple.spark.AppConfig;
import com.apple.spark.util.EndAwareInputStream;
import io.fabric8.kubernetes.api.model.DoneablePod;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
Expand Down Expand Up @@ -135,8 +132,7 @@ public static CustomResourceDefinitionContext getSparkApplicationCrdContext() {

public static InputStream tryGetLogStream(
DefaultKubernetesClient client, String namespace, String podName) {
PodResource<Pod, DoneablePod> podResource =
client.inNamespace(namespace).pods().withName(podName);
PodResource podResource = client.pods().inNamespace(namespace).withName(podName);
if (podResource == null) {
logger.info("Cannot get pod resource {}", podName);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.apple.spark.AppConfig;
import com.apple.spark.api.SubmissionSummary;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationResourceList;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
Expand All @@ -39,12 +39,12 @@ protected void writeSubmissions(
if (list == null) {
return;
}
List<SparkApplicationResource> sparkApplicationResources = list.getItems();
List<SparkApplication> sparkApplicationResources = list.getItems();
if (sparkApplicationResources == null) {
return;
}
ObjectMapper objectMapper = new ObjectMapper();
for (SparkApplicationResource sparkApplicationResource : sparkApplicationResources) {
for (SparkApplication sparkApplicationResource : sparkApplicationResources) {
SubmissionSummary submission = new SubmissionSummary();
submission.copyFrom(sparkApplicationResource, sparkCluster, appConfig);
String str = objectMapper.writeValueAsString(submission);
Expand Down
40 changes: 18 additions & 22 deletions src/main/java/com/apple/spark/core/RunningApplicationMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@

import com.apple.spark.AppConfig;
import com.apple.spark.operator.DriverInfo;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplicationResourceDoneable;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationResourceList;
import com.apple.spark.util.CounterMetricContainer;
import com.apple.spark.util.DateTimeUtils;
import com.apple.spark.util.GaugeMetricContainer;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.List;
Expand Down Expand Up @@ -111,13 +111,13 @@ public void run() {
deleteInterval);
}

public static long getMaxRunningMillis(SparkApplicationResource sparkApplicationResource) {
if (sparkApplicationResource.getMetadata() == null
|| sparkApplicationResource.getMetadata().getLabels() == null) {
public static long getMaxRunningMillis(SparkApplication sparkApplication) {
if (sparkApplication.getMetadata() == null
|| sparkApplication.getMetadata().getLabels() == null) {
return Constants.DEFAULT_MAX_RUNNING_MILLIS;
}
String labelValue =
sparkApplicationResource.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL);
sparkApplication.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL);
if (labelValue == null || labelValue.isEmpty()) {
return Constants.DEFAULT_MAX_RUNNING_MILLIS;
}
Expand All @@ -129,7 +129,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication
"Failed to parse value %s for label %s on %s",
labelValue,
Constants.MAX_RUNNING_MILLIS_LABEL,
sparkApplicationResource.getMetadata().getName()),
sparkApplication.getMetadata().getName()),
ex);
return Constants.DEFAULT_MAX_RUNNING_MILLIS;
}
Expand All @@ -143,7 +143,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication
* @param currCRDState the current CRD state
*/
public void onUpdate(
SparkApplicationResource prevCRDState, SparkApplicationResource currCRDState) {
SparkApplication prevCRDState, SparkApplication currCRDState) {
String newState = SparkApplicationResourceHelper.getState(currCRDState);
if (SparkConstants.RUNNING_STATE.equalsIgnoreCase(newState)) {
String name = currCRDState.getMetadata().getName();
Expand Down Expand Up @@ -214,18 +214,14 @@ public int getApplicationCount() {
* @param appName the name of the app (typically submission ID)
*/
protected void killApplication(String namespace, String appName) {
try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext();
SparkApplicationResource sparkApplicationResource =
client
.customResources(
crdContext,
SparkApplicationResource.class,
SparkApplicationResourceList.class,
SparkApplicationResourceDoneable.class)
.inNamespace(namespace)
.withName(appName)
.get();
try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
MixedOperation<SparkApplication, SparkApplicationResourceList, Resource<SparkApplication>>
sparkApplicationClient =
client.resources(SparkApplication.class, SparkApplicationResourceList.class);

SparkApplication sparkApplicationResource =
sparkApplicationClient.inNamespace(namespace).withName(appName).get();

if (sparkApplicationResource == null) {
logger.warn(
"Failed to kill application {}/{} due to application not found", namespace, appName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package com.apple.spark.core;

import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;

public class SparkApplicationResourceHelper {

public static String getState(SparkApplicationResource sparkApp) {
public static String getState(SparkApplication sparkApp) {
if (sparkApp.getStatus() != null && sparkApp.getStatus().getApplicationState() != null) {
return sparkApp.getStatus().getApplicationState().getState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;

@Version("v1beta2")
@Group("sparkoperator.k8s.io")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class SparkApplicationResource extends CustomResource {
public class SparkApplication extends CustomResource {

private SparkApplicationSpec spec;
private SparkApplicationStatus status;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@

import io.fabric8.kubernetes.client.CustomResourceList;

public class SparkApplicationResourceList extends CustomResourceList<SparkApplicationResource> {}
public class SparkApplicationResourceList extends CustomResourceList<SparkApplication> {}
Loading

0 comments on commit 48c2c9c

Please sign in to comment.