Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade version of fabric k8s client library #23

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>4.13.3</version>
<version>6.5.0</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/apple/spark/api/SubmissionStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static com.apple.spark.core.SparkConstants.SUBMITTED_STATE;

import com.apple.spark.core.Constants;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationStatus;
import com.apple.spark.util.DateTimeUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
Expand All @@ -39,7 +39,7 @@ public class SubmissionStatus {
private String applicationState;
private String applicationErrorMessage;

public void copyFrom(SparkApplicationResource sparkApplicationResource) {
public void copyFrom(SparkApplication sparkApplicationResource) {
Long creationTime =
DateTimeUtils.parseOrNull(sparkApplicationResource.getMetadata().getCreationTimestamp());
this.setCreationTime(creationTime);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/apple/spark/api/SubmissionSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.apple.spark.AppConfig;
import com.apple.spark.core.Constants;
import com.apple.spark.core.SparkConstants;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationSpec;
import com.apple.spark.util.ConfigUtil;
import com.fasterxml.jackson.annotation.JsonInclude;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void setApplicationName(String applicationName) {
}

public void copyFrom(
SparkApplicationResource sparkApplicationResource,
SparkApplication sparkApplicationResource,
AppConfig.SparkCluster sparkCluster,
AppConfig appConfig) {
this.copyFrom(sparkApplicationResource);
Expand All @@ -103,7 +103,7 @@ public void copyFrom(
}

@Override
public void copyFrom(SparkApplicationResource sparkApplicationResource) {
public void copyFrom(SparkApplication sparkApplicationResource) {
super.copyFrom(sparkApplicationResource);
setSubmissionId(sparkApplicationResource.getMetadata().getName());
if (sparkApplicationResource.getMetadata().getLabels() != null) {
Expand Down
35 changes: 13 additions & 22 deletions src/main/java/com/apple/spark/core/ApplicationMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@
import com.apple.spark.AppConfig;
import com.apple.spark.operator.DriverSpec;
import com.apple.spark.operator.ExecutorSpec;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplicationResourceList;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationSpec;
import com.apple.spark.util.CounterMetricContainer;
import com.apple.spark.util.DateTimeUtils;
import com.apple.spark.util.GaugeMetricContainer;
import com.apple.spark.util.KubernetesClusterAndNamespace;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
Expand Down Expand Up @@ -157,7 +153,7 @@ public void start() {
String.format(
"%s/%s", SparkConstants.SPARK_APPLICATION_CRD_GROUP, SparkConstants.CRD_VERSION),
SparkConstants.SPARK_APPLICATION_KIND,
SparkApplicationResource.class);
SparkApplication.class);

if (appConfig.getSparkClusters() != null) {
Map<KubernetesClusterAndNamespace, AppConfig.SparkCluster> uniqueClusters = new HashMap<>();
Expand Down Expand Up @@ -195,32 +191,27 @@ private void start(AppConfig.SparkCluster sparkCluster, Timer timer) {
sparkCluster.getEksCluster(),
sparkCluster.getSparkApplicationNamespace());

DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster);
KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster);

clients.add(client);

SharedInformerFactory sharedInformerFactory = client.informers();
informerFactories.add(sharedInformerFactory);

CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext();
SharedIndexInformer<SparkApplicationResource> informer =
sharedInformerFactory.sharedIndexInformerForCustomResource(
crdContext,
SparkApplicationResource.class,
SparkApplicationResourceList.class,
new OperationContext().withNamespace(sparkCluster.getSparkApplicationNamespace()),
RESYNC_MILLIS);
SharedIndexInformer<SparkApplication> informer =
sharedInformerFactory.sharedIndexInformerFor(SparkApplication.class, RESYNC_MILLIS);

RunningApplicationMonitor runningApplicationMonitor =
new RunningApplicationMonitor(sparkCluster, timer, meterRegistry);

informer.addEventHandler(
new ResourceEventHandler<SparkApplicationResource>() {
new ResourceEventHandler<SparkApplication>() {
@Override
public void onAdd(SparkApplicationResource sparkApplicationResource) {}
public void onAdd(SparkApplication sparkApplicationResource) {}

@Override
public void onUpdate(
SparkApplicationResource prevCRDState, SparkApplicationResource newCRDState) {
SparkApplication prevCRDState, SparkApplication newCRDState) {
int timeoutMillis = 100;
try {
boolean added =
Expand All @@ -245,7 +236,7 @@ public void onUpdate(

@Override
public void onDelete(
SparkApplicationResource sparkApplicationResource,
SparkApplication sparkApplicationResource,
boolean deletedFinalStateUnknown) {}
});

Expand Down Expand Up @@ -285,8 +276,8 @@ public void close() {
*/
private void onUpdateImpl_logApplication(
AppConfig.SparkCluster sparkCluster,
SparkApplicationResource prevCRDState,
SparkApplicationResource currCRDState) {
SparkApplication prevCRDState,
SparkApplication currCRDState) {
String submissionId = currCRDState.getMetadata().getName();
String newState = SparkApplicationResourceHelper.getState(currCRDState);
String oldState = SparkApplicationResourceHelper.getState(prevCRDState);
Expand All @@ -307,7 +298,7 @@ private void onUpdateImpl_logApplication(
Timestamp startTime = null;
if (!driverInfoCollectedSubIds.contains(submissionId)
&& newState.equals(SparkConstants.RUNNING_STATE)) {
try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
String driverStartTime;
SparkApplicationSpec spec = currCRDState.getSpec();
DriverSpec driverSpec = null;
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package com.apple.spark.core;

import com.apple.spark.AppConfig;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;

/** The event generated whenever there's CRD update from an application. */
public class ApplicationUpdateEvent {

private final SparkApplicationResource prevCRDState;
private final SparkApplicationResource newCRDState;
private final SparkApplication prevCRDState;
private final SparkApplication newCRDState;

// The runningApplicationMonitor instance corresponding to the specific Spark cluster
private final RunningApplicationMonitor runningApplicationMonitor;
Expand All @@ -36,20 +36,20 @@ public class ApplicationUpdateEvent {

public ApplicationUpdateEvent(
AppConfig.SparkCluster sparkCluster,
SparkApplicationResource prevCRDState,
SparkApplicationResource newCRDState,
SparkApplication prevCRDState,
SparkApplication newCRDState,
RunningApplicationMonitor runningApplicationMonitor) {
this.prevCRDState = prevCRDState;
this.newCRDState = newCRDState;
this.sparkCluster = sparkCluster;
this.runningApplicationMonitor = runningApplicationMonitor;
}

public SparkApplicationResource getPrevCRDState() {
public SparkApplication getPrevCRDState() {
return prevCRDState;
}

public SparkApplicationResource getNewCRDState() {
public SparkApplication getNewCRDState() {
return newCRDState;
}

Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/apple/spark/core/KubernetesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@

import com.apple.spark.AppConfig;
import com.apple.spark.util.EndAwareInputStream;
import io.fabric8.kubernetes.api.model.DoneablePod;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
Expand Down Expand Up @@ -135,8 +132,7 @@ public static CustomResourceDefinitionContext getSparkApplicationCrdContext() {

public static InputStream tryGetLogStream(
DefaultKubernetesClient client, String namespace, String podName) {
PodResource<Pod, DoneablePod> podResource =
client.inNamespace(namespace).pods().withName(podName);
PodResource podResource = client.pods().inNamespace(namespace).withName(podName);
if (podResource == null) {
logger.info("Cannot get pod resource {}", podName);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.apple.spark.AppConfig;
import com.apple.spark.api.SubmissionSummary;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationResourceList;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
Expand All @@ -39,12 +39,12 @@ protected void writeSubmissions(
if (list == null) {
return;
}
List<SparkApplicationResource> sparkApplicationResources = list.getItems();
List<SparkApplication> sparkApplicationResources = list.getItems();
if (sparkApplicationResources == null) {
return;
}
ObjectMapper objectMapper = new ObjectMapper();
for (SparkApplicationResource sparkApplicationResource : sparkApplicationResources) {
for (SparkApplication sparkApplicationResource : sparkApplicationResources) {
SubmissionSummary submission = new SubmissionSummary();
submission.copyFrom(sparkApplicationResource, sparkCluster, appConfig);
String str = objectMapper.writeValueAsString(submission);
Expand Down
40 changes: 18 additions & 22 deletions src/main/java/com/apple/spark/core/RunningApplicationMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@

import com.apple.spark.AppConfig;
import com.apple.spark.operator.DriverInfo;
import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplicationResourceDoneable;
import com.apple.spark.operator.SparkApplication;
import com.apple.spark.operator.SparkApplicationResourceList;
import com.apple.spark.util.CounterMetricContainer;
import com.apple.spark.util.DateTimeUtils;
import com.apple.spark.util.GaugeMetricContainer;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.List;
Expand Down Expand Up @@ -111,13 +111,13 @@ public void run() {
deleteInterval);
}

public static long getMaxRunningMillis(SparkApplicationResource sparkApplicationResource) {
if (sparkApplicationResource.getMetadata() == null
|| sparkApplicationResource.getMetadata().getLabels() == null) {
public static long getMaxRunningMillis(SparkApplication sparkApplication) {
if (sparkApplication.getMetadata() == null
|| sparkApplication.getMetadata().getLabels() == null) {
return Constants.DEFAULT_MAX_RUNNING_MILLIS;
}
String labelValue =
sparkApplicationResource.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL);
sparkApplication.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL);
if (labelValue == null || labelValue.isEmpty()) {
return Constants.DEFAULT_MAX_RUNNING_MILLIS;
}
Expand All @@ -129,7 +129,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication
"Failed to parse value %s for label %s on %s",
labelValue,
Constants.MAX_RUNNING_MILLIS_LABEL,
sparkApplicationResource.getMetadata().getName()),
sparkApplication.getMetadata().getName()),
ex);
return Constants.DEFAULT_MAX_RUNNING_MILLIS;
}
Expand All @@ -143,7 +143,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication
* @param currCRDState the current CRD state
*/
public void onUpdate(
SparkApplicationResource prevCRDState, SparkApplicationResource currCRDState) {
SparkApplication prevCRDState, SparkApplication currCRDState) {
String newState = SparkApplicationResourceHelper.getState(currCRDState);
if (SparkConstants.RUNNING_STATE.equalsIgnoreCase(newState)) {
String name = currCRDState.getMetadata().getName();
Expand Down Expand Up @@ -214,18 +214,14 @@ public int getApplicationCount() {
* @param appName the name of the app (typically submission ID)
*/
protected void killApplication(String namespace, String appName) {
try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext();
SparkApplicationResource sparkApplicationResource =
client
.customResources(
crdContext,
SparkApplicationResource.class,
SparkApplicationResourceList.class,
SparkApplicationResourceDoneable.class)
.inNamespace(namespace)
.withName(appName)
.get();
try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) {
MixedOperation<SparkApplication, SparkApplicationResourceList, Resource<SparkApplication>>
sparkApplicationClient =
client.resources(SparkApplication.class, SparkApplicationResourceList.class);

SparkApplication sparkApplicationResource =
sparkApplicationClient.inNamespace(namespace).withName(appName).get();

if (sparkApplicationResource == null) {
logger.warn(
"Failed to kill application {}/{} due to application not found", namespace, appName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package com.apple.spark.core;

import com.apple.spark.operator.SparkApplicationResource;
import com.apple.spark.operator.SparkApplication;

public class SparkApplicationResourceHelper {

public static String getState(SparkApplicationResource sparkApp) {
public static String getState(SparkApplication sparkApp) {
if (sparkApp.getStatus() != null && sparkApp.getStatus().getApplicationState() != null) {
return sparkApp.getStatus().getApplicationState().getState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.Version;

@Version("v1beta2")
@Group("sparkoperator.k8s.io")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class SparkApplicationResource extends CustomResource {
public class SparkApplication extends CustomResource {

private SparkApplicationSpec spec;
private SparkApplicationStatus status;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@

import io.fabric8.kubernetes.client.CustomResourceList;

public class SparkApplicationResourceList extends CustomResourceList<SparkApplicationResource> {}
public class SparkApplicationResourceList extends CustomResourceList<SparkApplication> {}
Loading