diff --git a/pom.xml b/pom.xml index 5ca9526..f36ab49 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,7 @@ io.fabric8 kubernetes-client - 4.13.3 + 6.5.0 io.micrometer diff --git a/src/main/java/com/apple/spark/api/SubmissionStatus.java b/src/main/java/com/apple/spark/api/SubmissionStatus.java index ea78594..ef29f70 100644 --- a/src/main/java/com/apple/spark/api/SubmissionStatus.java +++ b/src/main/java/com/apple/spark/api/SubmissionStatus.java @@ -23,7 +23,7 @@ import static com.apple.spark.core.SparkConstants.SUBMITTED_STATE; import com.apple.spark.core.Constants; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationStatus; import com.apple.spark.util.DateTimeUtils; import com.fasterxml.jackson.annotation.JsonInclude; @@ -39,7 +39,7 @@ public class SubmissionStatus { private String applicationState; private String applicationErrorMessage; - public void copyFrom(SparkApplicationResource sparkApplicationResource) { + public void copyFrom(SparkApplication sparkApplicationResource) { Long creationTime = DateTimeUtils.parseOrNull(sparkApplicationResource.getMetadata().getCreationTimestamp()); this.setCreationTime(creationTime); diff --git a/src/main/java/com/apple/spark/api/SubmissionSummary.java b/src/main/java/com/apple/spark/api/SubmissionSummary.java index 1981922..89d8a3f 100644 --- a/src/main/java/com/apple/spark/api/SubmissionSummary.java +++ b/src/main/java/com/apple/spark/api/SubmissionSummary.java @@ -24,7 +24,7 @@ import com.apple.spark.AppConfig; import com.apple.spark.core.Constants; import com.apple.spark.core.SparkConstants; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationSpec; import com.apple.spark.util.ConfigUtil; import com.fasterxml.jackson.annotation.JsonInclude; @@ -86,7 +86,7 @@ public void setApplicationName(String applicationName) { } public void copyFrom( - SparkApplicationResource sparkApplicationResource, + SparkApplication sparkApplicationResource, AppConfig.SparkCluster sparkCluster, AppConfig appConfig) { this.copyFrom(sparkApplicationResource); @@ -103,7 +103,7 @@ public void copyFrom( } @Override - public void copyFrom(SparkApplicationResource sparkApplicationResource) { + public void copyFrom(SparkApplication sparkApplicationResource) { super.copyFrom(sparkApplicationResource); setSubmissionId(sparkApplicationResource.getMetadata().getName()); if (sparkApplicationResource.getMetadata().getLabels() != null) { diff --git a/src/main/java/com/apple/spark/core/ApplicationMonitor.java b/src/main/java/com/apple/spark/core/ApplicationMonitor.java index 2297500..b707334 100644 --- a/src/main/java/com/apple/spark/core/ApplicationMonitor.java +++ b/src/main/java/com/apple/spark/core/ApplicationMonitor.java @@ -24,18 +24,14 @@ import com.apple.spark.AppConfig; import com.apple.spark.operator.DriverSpec; import com.apple.spark.operator.ExecutorSpec; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceList; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationSpec; import com.apple.spark.util.CounterMetricContainer; import com.apple.spark.util.DateTimeUtils; import com.apple.spark.util.GaugeMetricContainer; import com.apple.spark.util.KubernetesClusterAndNamespace; import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; -import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; @@ -157,7 +153,7 @@ public void start() { String.format( "%s/%s", SparkConstants.SPARK_APPLICATION_CRD_GROUP, SparkConstants.CRD_VERSION), SparkConstants.SPARK_APPLICATION_KIND, - SparkApplicationResource.class); + SparkApplication.class); if (appConfig.getSparkClusters() != null) { Map uniqueClusters = new HashMap<>(); @@ -195,32 +191,27 @@ private void start(AppConfig.SparkCluster sparkCluster, Timer timer) { sparkCluster.getEksCluster(), sparkCluster.getSparkApplicationNamespace()); - DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + clients.add(client); SharedInformerFactory sharedInformerFactory = client.informers(); informerFactories.add(sharedInformerFactory); - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - SharedIndexInformer informer = - sharedInformerFactory.sharedIndexInformerForCustomResource( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - new OperationContext().withNamespace(sparkCluster.getSparkApplicationNamespace()), - RESYNC_MILLIS); + SharedIndexInformer informer = + sharedInformerFactory.sharedIndexInformerFor(SparkApplication.class, RESYNC_MILLIS); RunningApplicationMonitor runningApplicationMonitor = new RunningApplicationMonitor(sparkCluster, timer, meterRegistry); informer.addEventHandler( - new ResourceEventHandler() { + new ResourceEventHandler() { @Override - public void onAdd(SparkApplicationResource sparkApplicationResource) {} + public void onAdd(SparkApplication sparkApplicationResource) {} @Override public void onUpdate( - SparkApplicationResource prevCRDState, SparkApplicationResource newCRDState) { + SparkApplication prevCRDState, SparkApplication newCRDState) { int timeoutMillis = 100; try { boolean added = @@ -245,7 +236,7 @@ public void onUpdate( @Override public void onDelete( - SparkApplicationResource sparkApplicationResource, + SparkApplication sparkApplicationResource, boolean deletedFinalStateUnknown) {} }); @@ -285,8 +276,8 @@ public void close() { */ private void onUpdateImpl_logApplication( AppConfig.SparkCluster sparkCluster, - SparkApplicationResource prevCRDState, - SparkApplicationResource currCRDState) { + SparkApplication prevCRDState, + SparkApplication currCRDState) { String submissionId = currCRDState.getMetadata().getName(); String newState = SparkApplicationResourceHelper.getState(currCRDState); String oldState = SparkApplicationResourceHelper.getState(prevCRDState); @@ -307,7 +298,7 @@ private void onUpdateImpl_logApplication( Timestamp startTime = null; if (!driverInfoCollectedSubIds.contains(submissionId) && newState.equals(SparkConstants.RUNNING_STATE)) { - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { String driverStartTime; SparkApplicationSpec spec = currCRDState.getSpec(); DriverSpec driverSpec = null; diff --git a/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java b/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java index 9d4e36f..61ba68e 100644 --- a/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java +++ b/src/main/java/com/apple/spark/core/ApplicationUpdateEvent.java @@ -20,13 +20,13 @@ package com.apple.spark.core; import com.apple.spark.AppConfig; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; /** The event generated whenever there's CRD update from an application. */ public class ApplicationUpdateEvent { - private final SparkApplicationResource prevCRDState; - private final SparkApplicationResource newCRDState; + private final SparkApplication prevCRDState; + private final SparkApplication newCRDState; // The runningApplicationMonitor instance corresponding to the specific Spark cluster private final RunningApplicationMonitor runningApplicationMonitor; @@ -36,8 +36,8 @@ public class ApplicationUpdateEvent { public ApplicationUpdateEvent( AppConfig.SparkCluster sparkCluster, - SparkApplicationResource prevCRDState, - SparkApplicationResource newCRDState, + SparkApplication prevCRDState, + SparkApplication newCRDState, RunningApplicationMonitor runningApplicationMonitor) { this.prevCRDState = prevCRDState; this.newCRDState = newCRDState; @@ -45,11 +45,11 @@ public ApplicationUpdateEvent( this.runningApplicationMonitor = runningApplicationMonitor; } - public SparkApplicationResource getPrevCRDState() { + public SparkApplication getPrevCRDState() { return prevCRDState; } - public SparkApplicationResource getNewCRDState() { + public SparkApplication getNewCRDState() { return newCRDState; } diff --git a/src/main/java/com/apple/spark/core/KubernetesHelper.java b/src/main/java/com/apple/spark/core/KubernetesHelper.java index a4bff80..47f118c 100644 --- a/src/main/java/com/apple/spark/core/KubernetesHelper.java +++ b/src/main/java/com/apple/spark/core/KubernetesHelper.java @@ -21,11 +21,8 @@ import com.apple.spark.AppConfig; import com.apple.spark.util.EndAwareInputStream; -import io.fabric8.kubernetes.api.model.DoneablePod; import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.*; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; @@ -135,8 +132,7 @@ public static CustomResourceDefinitionContext getSparkApplicationCrdContext() { public static InputStream tryGetLogStream( DefaultKubernetesClient client, String namespace, String podName) { - PodResource podResource = - client.inNamespace(namespace).pods().withName(podName); + PodResource podResource = client.pods().inNamespace(namespace).withName(podName); if (podResource == null) { logger.info("Cannot get pod resource {}", podName); return null; diff --git a/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java b/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java index 3ac1f3f..51b3dac 100644 --- a/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java +++ b/src/main/java/com/apple/spark/core/RestSubmissionsStreamingOutput.java @@ -21,7 +21,7 @@ import com.apple.spark.AppConfig; import com.apple.spark.api.SubmissionSummary; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; @@ -39,12 +39,12 @@ protected void writeSubmissions( if (list == null) { return; } - List sparkApplicationResources = list.getItems(); + List sparkApplicationResources = list.getItems(); if (sparkApplicationResources == null) { return; } ObjectMapper objectMapper = new ObjectMapper(); - for (SparkApplicationResource sparkApplicationResource : sparkApplicationResources) { + for (SparkApplication sparkApplicationResource : sparkApplicationResources) { SubmissionSummary submission = new SubmissionSummary(); submission.copyFrom(sparkApplicationResource, sparkCluster, appConfig); String str = objectMapper.writeValueAsString(submission); diff --git a/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java b/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java index 1e9a258..0bf60b4 100644 --- a/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java +++ b/src/main/java/com/apple/spark/core/RunningApplicationMonitor.java @@ -24,14 +24,14 @@ import com.apple.spark.AppConfig; import com.apple.spark.operator.DriverInfo; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceDoneable; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.apple.spark.util.CounterMetricContainer; import com.apple.spark.util.DateTimeUtils; import com.apple.spark.util.GaugeMetricContainer; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import java.util.List; @@ -111,13 +111,13 @@ public void run() { deleteInterval); } - public static long getMaxRunningMillis(SparkApplicationResource sparkApplicationResource) { - if (sparkApplicationResource.getMetadata() == null - || sparkApplicationResource.getMetadata().getLabels() == null) { + public static long getMaxRunningMillis(SparkApplication sparkApplication) { + if (sparkApplication.getMetadata() == null + || sparkApplication.getMetadata().getLabels() == null) { return Constants.DEFAULT_MAX_RUNNING_MILLIS; } String labelValue = - sparkApplicationResource.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL); + sparkApplication.getMetadata().getLabels().get(Constants.MAX_RUNNING_MILLIS_LABEL); if (labelValue == null || labelValue.isEmpty()) { return Constants.DEFAULT_MAX_RUNNING_MILLIS; } @@ -129,7 +129,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication "Failed to parse value %s for label %s on %s", labelValue, Constants.MAX_RUNNING_MILLIS_LABEL, - sparkApplicationResource.getMetadata().getName()), + sparkApplication.getMetadata().getName()), ex); return Constants.DEFAULT_MAX_RUNNING_MILLIS; } @@ -143,7 +143,7 @@ public static long getMaxRunningMillis(SparkApplicationResource sparkApplication * @param currCRDState the current CRD state */ public void onUpdate( - SparkApplicationResource prevCRDState, SparkApplicationResource currCRDState) { + SparkApplication prevCRDState, SparkApplication currCRDState) { String newState = SparkApplicationResourceHelper.getState(currCRDState); if (SparkConstants.RUNNING_STATE.equalsIgnoreCase(newState)) { String name = currCRDState.getMetadata().getName(); @@ -214,18 +214,14 @@ public int getApplicationCount() { * @param appName the name of the app (typically submission ID) */ protected void killApplication(String namespace, String appName) { - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - SparkApplicationResource sparkApplicationResource = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(namespace) - .withName(appName) - .get(); + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster)) { + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + + SparkApplication sparkApplicationResource = + sparkApplicationClient.inNamespace(namespace).withName(appName).get(); + if (sparkApplicationResource == null) { logger.warn( "Failed to kill application {}/{} due to application not found", namespace, appName); diff --git a/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java b/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java index 222808c..2ccf303 100644 --- a/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java +++ b/src/main/java/com/apple/spark/core/SparkApplicationResourceHelper.java @@ -19,11 +19,11 @@ package com.apple.spark.core; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; public class SparkApplicationResourceHelper { - public static String getState(SparkApplicationResource sparkApp) { + public static String getState(SparkApplication sparkApp) { if (sparkApp.getStatus() != null && sparkApp.getStatus().getApplicationState() != null) { return sparkApp.getStatus().getApplicationState().getState(); } diff --git a/src/main/java/com/apple/spark/operator/SparkApplicationResource.java b/src/main/java/com/apple/spark/operator/SparkApplication.java similarity index 86% rename from src/main/java/com/apple/spark/operator/SparkApplicationResource.java rename to src/main/java/com/apple/spark/operator/SparkApplication.java index 460fb33..bbe0146 100644 --- a/src/main/java/com/apple/spark/operator/SparkApplicationResource.java +++ b/src/main/java/com/apple/spark/operator/SparkApplication.java @@ -22,10 +22,14 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.Version; +@Version("v1beta2") +@Group("sparkoperator.k8s.io") @JsonInclude(JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) -public class SparkApplicationResource extends CustomResource { +public class SparkApplication extends CustomResource { private SparkApplicationSpec spec; private SparkApplicationStatus status; diff --git a/src/main/java/com/apple/spark/operator/SparkApplicationResourceDoneable.java b/src/main/java/com/apple/spark/operator/SparkApplicationResourceDoneable.java deleted file mode 100644 index f3c20c8..0000000 --- a/src/main/java/com/apple/spark/operator/SparkApplicationResourceDoneable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * This source file is part of the Batch Processing Gateway open source project - * - * Copyright 2022 Apple Inc. and the Batch Processing Gateway project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.apple.spark.operator; - -import io.fabric8.kubernetes.api.builder.Function; -import io.fabric8.kubernetes.client.CustomResourceDoneable; - -public class SparkApplicationResourceDoneable - extends CustomResourceDoneable { - - public SparkApplicationResourceDoneable( - SparkApplicationResource resource, - Function function) { - super(resource, function); - } -} diff --git a/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java b/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java index dbc133c..06eeaeb 100644 --- a/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java +++ b/src/main/java/com/apple/spark/operator/SparkApplicationResourceList.java @@ -21,4 +21,4 @@ import io.fabric8.kubernetes.client.CustomResourceList; -public class SparkApplicationResourceList extends CustomResourceList {} +public class SparkApplicationResourceList extends CustomResourceList {} diff --git a/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java b/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java index edb4005..6070303 100644 --- a/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java +++ b/src/main/java/com/apple/spark/rest/ApplicationGetLogRest.java @@ -34,7 +34,7 @@ import com.apple.spark.core.KubernetesHelper; import com.apple.spark.core.LogDao; import com.apple.spark.core.RestStreamingOutput; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.security.User; import com.apple.spark.util.ExceptionUtils; import com.codahale.metrics.MetricRegistry; @@ -192,7 +192,7 @@ public Response getLog( // If s3only is true, skip searching EKS. if (s3only.equalsIgnoreCase("false")) { try { - final SparkApplicationResource sparkApplicationResource = getSparkApplicationResource(id); + final SparkApplication sparkApplicationResource = getSparkApplicationResource(id); logStream = getLog(sparkApplicationResource, execId); } catch (Throwable ex) { ExceptionUtils.meterException(); @@ -300,7 +300,7 @@ private AmazonS3 getS3Client() { } @ExceptionMetered(name = "RuntimeException", absolute = true, cause = RuntimeException.class) - private InputStream getLog(SparkApplicationResource sparkApplication, String execId) { + private InputStream getLog(SparkApplication sparkApplication, String execId) { if (sparkApplication == null) { logger.info("Cannot get log from EKS, spark application not found"); return null; diff --git a/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java b/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java index c65674d..7b48401 100644 --- a/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java +++ b/src/main/java/com/apple/spark/rest/ApplicationSubmissionRest.java @@ -43,8 +43,7 @@ import com.apple.spark.api.SubmitApplicationResponse; import com.apple.spark.core.*; import com.apple.spark.operator.DriverInfo; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceDoneable; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.apple.spark.operator.SparkApplicationSpec; import com.apple.spark.security.User; @@ -69,6 +68,9 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; @@ -154,7 +156,7 @@ public GetSubmissionStatusResponseCacheValue load(String submissionId) { AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); try { GetSubmissionStatusResponse response = - getStatusImplWithoutCache(submissionId, sparkCluster); + getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); return new GetSubmissionStatusResponseCacheValue(response); } catch (Throwable ex) { requestCounters.increment( @@ -343,21 +345,21 @@ private SubmitApplicationResponse submitSparkCRD( com.codahale.metrics.Timer timer = registry.timer(this.getClass().getSimpleName() + ".submitApplication.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); - com.codahale.metrics.Timer.Context context = timer.time()) { - SparkApplicationResource sparkApplicationResource = new SparkApplicationResource(); - sparkApplicationResource.setApiVersion(SparkConstants.SPARK_OPERATOR_API_VERSION); - sparkApplicationResource.setKind(SparkConstants.SPARK_APPLICATION_KIND); - sparkApplicationResource.getMetadata().setName(submissionId); - sparkApplicationResource + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + com.codahale.metrics.Timer.Context context = timer.time()) { + SparkApplication sparkApplication = new SparkApplication(); + sparkApplication.setApiVersion(SparkConstants.SPARK_OPERATOR_API_VERSION); + sparkApplication.setKind(SparkConstants.SPARK_APPLICATION_KIND); + sparkApplication.getMetadata().setName(submissionId); + sparkApplication .getMetadata() .setNamespace(sparkCluster.getSparkApplicationNamespace()); - if (sparkApplicationResource.getMetadata().getLabels() == null) { - sparkApplicationResource.getMetadata().setLabels(new HashMap<>()); + if (sparkApplication.getMetadata().getLabels() == null) { + sparkApplication.getMetadata().setLabels(new HashMap<>()); } if (sparkSpec.getProxyUser() != null) { - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(PROXY_USER_LABEL, sparkSpec.getProxyUser()); @@ -365,13 +367,13 @@ private SubmitApplicationResponse submitSparkCRD( if (request.getApplicationName() != null) { String applicationNameLabelValue = KubernetesHelper.normalizeLabelValue(request.getApplicationName()); - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(APPLICATION_NAME_LABEL, applicationNameLabelValue); } - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(QUEUE_LABEL, YUNIKORN_ROOT_QUEUE + "." + queue); @@ -400,7 +402,7 @@ private SubmitApplicationResponse submitSparkCRD( } } } - sparkApplicationResource + sparkApplication .getMetadata() .getLabels() .put(MAX_RUNNING_MILLIS_LABEL, String.valueOf(maxRunningMillis)); @@ -412,16 +414,13 @@ private SubmitApplicationResponse submitSparkCRD( logger.warn("Failed to serialize SparkApplicationSpec and mask sensitive info", ex); } - sparkApplicationResource.setSpec(sparkSpec); - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .create(sparkApplicationResource); + sparkApplication.setSpec(sparkSpec); + + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + sparkApplicationClient.create(sparkApplication); SubmitApplicationResponse response = new SubmitApplicationResponse(); response.setSubmissionId(submissionId); context.stop(); @@ -465,18 +464,16 @@ public DeleteSubmissionResponse deleteSubmission( AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); com.codahale.metrics.Timer timer = registry.timer(this.getClass().getSimpleName() + ".deleteSubmission.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withName(submissionId) - .delete(); + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + + sparkApplicationClient + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withName(submissionId) + .delete(); context.stop(); return new DeleteSubmissionResponse(); } @@ -515,9 +512,9 @@ public SparkApplicationSpec getSparkSpec( clientVersion); requestCounters.increment( REQUEST_METRIC_NAME, Tag.of("name", "get_spec"), Tag.of("user", user.getName())); - SparkApplicationResource sparkApplicationResource = getSparkApplicationResource(submissionId); + SparkApplication sparkApplication = getSparkApplicationResource(submissionId); SparkApplicationSpec sparkApplicationSpec = - removeEnvFromSpec(sparkApplicationResource.getSpec()); + removeEnvFromSpec(sparkApplication.getSpec()); return sparkApplicationSpec; } @@ -561,13 +558,13 @@ private GetSubmissionStatusResponse getStatusImpl(String submissionId, User user STATUS_CACHE_GET_FAILURE, Tag.of("exception", ex.getClass().getSimpleName())); logger.warn(String.format("Failed to get status from cache for %s", submissionId), ex); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - return getStatusImplWithoutCache(submissionId, sparkCluster); + return getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); } if (cacheValue == null) { logger.warn("Got null status cache value for {}", submissionId); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - return getStatusImplWithoutCache(submissionId, sparkCluster); + return getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); } long cacheElapsedTime = System.currentTimeMillis() - cacheValue.getCreatedTimeMillis(); @@ -578,7 +575,7 @@ private GetSubmissionStatusResponse getStatusImpl(String submissionId, User user logger.warn( "Got expired status cache value ({} millis) for {}", cacheElapsedTime, submissionId); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - return getStatusImplWithoutCache(submissionId, sparkCluster); + return getStatusImplWithoutCache(getSparkApplicationResource(submissionId), submissionId, sparkCluster); } if (cacheValue.getResponse() == null) { @@ -603,24 +600,8 @@ private GetSubmissionStatusResponse getStatusImpl(String submissionId, User user } private GetSubmissionStatusResponse getStatusImplWithoutCache( - String submissionId, AppConfig.SparkCluster sparkCluster) { - com.codahale.metrics.Timer timer = - registry.timer(this.getClass().getSimpleName() + ".getStatus.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); - com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - - SparkApplicationResource sparkApplication = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withName(submissionId) - .get(); - context.stop(); + SparkApplication sparkApplication, String submissionId, AppConfig.SparkCluster sparkCluster) { + if (sparkApplication == null) { throw new WebApplicationException( String.format("Application submission %s not found", submissionId), @@ -678,7 +659,6 @@ private GetSubmissionStatusResponse getStatusImplWithoutCache( } return response; - } } @GET() @@ -716,7 +696,7 @@ public GetDriverInfoResponse getDriverInfo( REQUEST_METRIC_NAME, Tag.of("name", "get_driver"), Tag.of("user", user.getName())); AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); - SparkApplicationResource sparkApplicationResource = getSparkApplicationResource(submissionId); + SparkApplication sparkApplicationResource = getSparkApplicationResource(submissionId); if (sparkApplicationResource.getStatus() == null || sparkApplicationResource.getStatus().getDriverInfo() == null) { return new GetDriverInfoResponse(); @@ -770,7 +750,7 @@ public Response describe( Tag.of("name", "describe_application"), Tag.of("user", user.getName())); - final SparkApplicationResource sparkApplicationResource = + final SparkApplication sparkApplicationResource = getSparkApplicationResource(submissionId); SparkApplicationSpec sparkApplicationSpec = removeEnvFromSpec(sparkApplicationResource.getSpec()); @@ -862,9 +842,9 @@ public GetMySubmissionsResponse getSubmissions( for (AppConfig.SparkCluster sparkCluster : getSparkClusters()) { SparkApplicationResourceList list = getSparkApplicationResourcesByUser(sparkCluster, user.getName()); - List sparkApplicationResources = list.getItems(); + List sparkApplicationResources = list.getItems(); if (sparkApplicationResources != null) { - for (SparkApplicationResource sparkApplicationResource : sparkApplicationResources) { + for (SparkApplication sparkApplicationResource : sparkApplicationResources) { SubmissionSummary submission = new SubmissionSummary(); submission.copyFrom(sparkApplicationResource, sparkCluster, getAppConfig()); submissionList.add(submission); @@ -884,11 +864,12 @@ protected List getEvents(AppConfig.SparkCluster sparkCluster, String obje try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { EventList eventList = - client - .events() - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withFields(fields) - .list(); + client + .v1() + .events() + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withFields(fields) + .list(); context.stop(); if (eventList == null) { return Collections.EMPTY_LIST; diff --git a/src/main/java/com/apple/spark/rest/RestBase.java b/src/main/java/com/apple/spark/rest/RestBase.java index f6989b6..5ffb2ed 100644 --- a/src/main/java/com/apple/spark/rest/RestBase.java +++ b/src/main/java/com/apple/spark/rest/RestBase.java @@ -23,8 +23,7 @@ import com.apple.spark.core.ApplicationSubmissionHelper; import com.apple.spark.core.Constants; import com.apple.spark.core.KubernetesHelper; -import com.apple.spark.operator.SparkApplicationResource; -import com.apple.spark.operator.SparkApplicationResourceDoneable; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationResourceList; import com.apple.spark.util.CounterMetricContainer; import com.apple.spark.util.TimerMetricContainer; @@ -33,7 +32,9 @@ import com.google.common.util.concurrent.RateLimiter; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.micrometer.core.instrument.MeterRegistry; import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.info.Info; @@ -114,23 +115,23 @@ protected AppConfig.SparkCluster getSparkCluster(String submissionId) { return sparkClusterOptional.get(); } - protected SparkApplicationResource getSparkApplicationResource(String submissionId) { + protected SparkApplication getSparkApplicationResource(String submissionId) { AppConfig.SparkCluster sparkCluster = getSparkCluster(submissionId); com.codahale.metrics.Timer timer = registry.timer(this.getClass().getSimpleName() + ".getSparkApplicationResource.k8s-time"); - try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); - com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); - SparkApplicationResource sparkApplication = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withName(submissionId) - .get(); + try (KubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); + com.codahale.metrics.Timer.Context context = timer.time()) { + + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + + SparkApplication sparkApplication = + sparkApplicationClient + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withName(submissionId) + .get(); + context.stop(); if (sparkApplication == null) { throw new WebApplicationException(Response.Status.NOT_FOUND); @@ -151,17 +152,15 @@ protected SparkApplicationResourceList getSparkApplicationResourcesByLabel( this.getClass().getSimpleName() + ".getSparkApplicationResourcesByUser.k8s-time"); try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + SparkApplicationResourceList list = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .withLabel(labelName, labelValue) - .list(); + sparkApplicationClient + .inNamespace(sparkCluster.getSparkApplicationNamespace()) + .withLabel(labelName, labelValue) + .list(); context.stop(); if (list == null) { return new SparkApplicationResourceList(); @@ -176,16 +175,13 @@ protected SparkApplicationResourceList getSparkApplicationResources( registry.timer(this.getClass().getSimpleName() + ".getSparkApplicationResources.k8s-time"); try (DefaultKubernetesClient client = KubernetesHelper.getK8sClient(sparkCluster); com.codahale.metrics.Timer.Context context = timer.time()) { - CustomResourceDefinitionContext crdContext = KubernetesHelper.getSparkApplicationCrdContext(); + MixedOperation> + sparkApplicationClient = + client.resources(SparkApplication.class, SparkApplicationResourceList.class); + SparkApplicationResourceList list = - client - .customResources( - crdContext, - SparkApplicationResource.class, - SparkApplicationResourceList.class, - SparkApplicationResourceDoneable.class) - .inNamespace(sparkCluster.getSparkApplicationNamespace()) - .list(); + sparkApplicationClient.inNamespace(sparkCluster.getSparkApplicationNamespace()).list(); + context.stop(); if (list == null) { return new SparkApplicationResourceList(); diff --git a/src/main/java/com/apple/spark/util/HttpUtils.java b/src/main/java/com/apple/spark/util/HttpUtils.java index 2e36d4d..59ab18f 100644 --- a/src/main/java/com/apple/spark/util/HttpUtils.java +++ b/src/main/java/com/apple/spark/util/HttpUtils.java @@ -21,70 +21,63 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.io.InputStream; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okio.BufferedSink; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; public class HttpUtils { public static T post( - String url, String requestJson, String headerName, String headerValue, Class clazz) { - MediaType mediaType = MediaType.parse("application/json"); - return post(url, requestJson, mediaType, headerName, headerValue, clazz); + String url, String requestJson, String headerName, String headerValue, Class clazz) { + return post(url, requestJson, "application/json", headerName, headerValue, clazz); } public static T post( - String url, - String requestText, - MediaType mediaType, - String headerName, - String headerValue, - Class clazz) { - String str = post(url, requestText, mediaType, headerName, headerValue); + String url, + String requestText, + String contentType, + String headerName, + String headerValue, + Class clazz) { + String str = post(url, requestText, contentType, headerName, headerValue); return parseJson(str, clazz); } public static String post(String url, String requestJson, String headerName, String headerValue) { - MediaType mediaType = MediaType.parse("application/json"); - return post(url, requestJson, mediaType, headerName, headerValue); + return post(url, requestJson, "application/json", headerName, headerValue); } public static String post( - String url, String requestText, MediaType mediaType, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - RequestBody body = RequestBody.create(mediaType, requestText); - Request request = - new Request.Builder().url(url).header(headerName, headerValue).post(body).build(); - return executeHttpRequest(url, client, request); + String url, String requestText, String contentType, String headerName, String headerValue) { + HttpRequest request = null; + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)).header("Content-Type", contentType); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + request = builder.POST(HttpRequest.BodyPublishers.ofString(requestText)).build(); + return executeHttpRequest(url, request, HttpResponse.BodyHandlers.ofString()); + } catch (Throwable ex) { + throw new RuntimeException( + String.format("Failed to execute %s on %s", request.method(), url), ex); + } } public static T post( - String url, InputStream stream, String headerName, String headerValue, Class clazz) { + String url, InputStream stream, String headerName, String headerValue, Class clazz) { String str = post(url, stream, headerName, headerValue); return parseJson(str, clazz); } public static T post( - String url, - InputStream stream, - String headerName, - String headerValue, - long contentLength, - Class clazz) { + String url, + InputStream stream, + String headerName, + String headerValue, + long contentLength, + Class clazz) { String str = post(url, stream, headerName, headerValue, contentLength); return parseJson(str, clazz); } @@ -94,34 +87,19 @@ public static String post(String url, InputStream stream, String headerName, Str } public static String post( - String url, InputStream stream, String headerName, String headerValue, long contentLength) { - RequestBody requestBody = - new RequestBody() { - @Override - public MediaType contentType() { - return MediaType.parse("application/octet-stream"); - } - - @Override - public void writeTo(BufferedSink sink) throws IOException { - byte[] buffer = new byte[31]; - int size = stream.read(buffer); - while (size != -1) { - sink.write(buffer, 0, size); - size = stream.read(buffer); - } - } - - @Override - public long contentLength() throws IOException { - return contentLength; - } - }; - - OkHttpClient client = getOkHttpClient(); - Request request = - new Request.Builder().url(url).header(headerName, headerValue).post(requestBody).build(); - return executeHttpRequest(url, client, request); + String url, InputStream stream, String headerName, String headerValue, long contentLength) { + HttpRequest request = null; + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + request = builder.POST(HttpRequest.BodyPublishers.ofInputStream(() -> stream)).build(); + return executeHttpRequest(url, request, HttpResponse.BodyHandlers.ofString()); + } catch (Throwable ex) { + throw new RuntimeException( + String.format("Failed to execute %s on %s", request.method(), url), ex); + } } public static T get(String url, String headerName, String headerValue, Class clazz) { @@ -130,15 +108,22 @@ public static T get(String url, String headerName, String headerValue, Class } public static String get(String url, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - Request request = new Request.Builder().url(url).header(headerName, headerValue).get().build(); - return executeHttpRequest(url, client, request); + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + HttpRequest request = builder.GET().build(); + HttpResponse response = + HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString()); + return response.body(); + } catch (Throwable ex) { + throw new RuntimeException(String.format("Failed to get from %s", url), ex); + } } public static String get(String url) { - OkHttpClient client = getOkHttpClient(); - Request request = new Request.Builder().url(url).get().build(); - return executeHttpRequest(url, client, request); + return get(url, null, null); } public static T delete(String url, String headerName, String headerValue, Class clazz) { @@ -147,90 +132,35 @@ public static T delete(String url, String headerName, String headerValue, Cl } public static String delete(String url, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - Request request = - new Request.Builder().url(url).header(headerName, headerValue).delete().build(); - return executeHttpRequest(url, client, request); - } - - public static Response getHttpResponse(String url, String headerName, String headerValue) { - OkHttpClient client = getOkHttpClient(); - Request request = new Request.Builder().url(url).header(headerName, headerValue).get().build(); - Response response = null; - try { - response = client.newCall(request).execute(); - } catch (IOException ex) { - throw new RuntimeException(String.format("Failed to get request to %s", url), ex); - } - try { - checkResponseOK(url, response); - } catch (RuntimeException ex) { - response.close(); - throw ex; - } - return response; - } - - private static OkHttpClient getOkHttpClient() { - X509TrustManager trustManager = - new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) - throws CertificateException {} - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) - throws CertificateException {} - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - }; - try { - SSLContext sslContext = SSLContext.getInstance("SSL"); - sslContext.init(null, new TrustManager[] {trustManager}, new java.security.SecureRandom()); - SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); - - OkHttpClient.Builder builder = new OkHttpClient.Builder(); - builder.sslSocketFactory(sslSocketFactory, trustManager); - return builder - .connectTimeout(90, TimeUnit.SECONDS) - .writeTimeout(90, TimeUnit.SECONDS) - .readTimeout(90, TimeUnit.SECONDS) - .build(); - } catch (NoSuchAlgorithmException | KeyManagementException e) { - throw new RuntimeException("Failed to create OkHttp client", e); - } - } - - private static String executeHttpRequest(String url, OkHttpClient client, Request request) { - try (Response response = client.newCall(request).execute()) { - checkResponseOK(url, response); - String responseStr = response.body().string(); - return responseStr; + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + HttpRequest request = builder.DELETE().build(); + HttpResponse response = + HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString()); + return response.body(); } catch (Throwable ex) { - throw new RuntimeException( - String.format("Failed to execute %s on %s", request.method(), url), ex); + throw new RuntimeException(String.format("Failed to delete %s", url), ex); } } - private static void checkResponseOK(String url, Response response) { - if (response.code() < 200 || response.code() >= 300) { + private static void checkResponseOK(String url, HttpResponse response) { + if (response.statusCode() < 200 || response.statusCode() >= 300) { String responseBodyStr; try { - responseBodyStr = response.body().string(); - } catch (IOException e) { + responseBodyStr = response.body().toString(); // TODO modify this + } catch (Throwable e) { responseBodyStr = - String.format( - "(Failed to get response body, exception: %s)", - ExceptionUtils.getExceptionNameAndMessage(e)); + String.format( + "(Failed to get response body, exception: %s)", + ExceptionUtils.getExceptionNameAndMessage(e)); } throw new RuntimeException( - String.format( - "Response for %s is not OK: %s. Response body: %s", - url, response.code(), responseBodyStr)); + String.format( + "Response for %s is not OK: %s. Response body: %s", + url, response.statusCode(), responseBodyStr)); } } @@ -242,4 +172,39 @@ private static T parseJson(String str, Class clazz) { throw new RuntimeException(String.format("Failed to parse json: %s", str), e); } } + + public static HttpResponse getHttpResponse(String url, String headerName, String headerValue) { + HttpResponse response; + try { + var builder = HttpRequest.newBuilder().uri(new URI(url)); + if (headerName != null && !headerName.isEmpty()) { + builder = builder.header(headerName, headerValue); + } + HttpRequest request = builder.GET().build(); + response = + HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString()); + } catch (Throwable ex) { + throw new RuntimeException(String.format("Failed to get request from %s", url), ex); + } + + try { + checkResponseOK(url, response); + } catch (RuntimeException ex) { + throw ex; + } + return response; + } + + private static String executeHttpRequest( + String url, HttpRequest request, HttpResponse.BodyHandler bodyHandler) { + try { + HttpResponse response = HttpClient.newBuilder().build().send(request, bodyHandler); + checkResponseOK(url, response); + String responseStr = response.body(); + return responseStr; + } catch (Throwable ex) { + throw new RuntimeException( + String.format("Failed to execute %s on %s", request.method(), url), ex); + } + } } diff --git a/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java b/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java index e0d8e9e..1bf289f 100644 --- a/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java +++ b/src/test/java/com/apple/spark/core/RunningApplicationMonitorTest.java @@ -21,7 +21,7 @@ import com.apple.spark.AppConfig; import com.apple.spark.operator.ApplicationState; -import com.apple.spark.operator.SparkApplicationResource; +import com.apple.spark.operator.SparkApplication; import com.apple.spark.operator.SparkApplicationStatus; import com.apple.spark.util.DateTimeUtils; import io.micrometer.core.instrument.logging.LoggingMeterRegistry; @@ -42,8 +42,8 @@ public void test() throws InterruptedException { new RunningApplicationMonitor(sparkCluster, timer, interval, new LoggingMeterRegistry()); Assert.assertEquals(monitor.getApplicationCount(), 0); - SparkApplicationResource prevCRDState = new SparkApplicationResource(); - SparkApplicationResource newCRDState = new SparkApplicationResource(); + SparkApplication prevCRDState = new SparkApplication(); + SparkApplication newCRDState = new SparkApplication(); monitor.onUpdate(prevCRDState, newCRDState); Assert.assertEquals(monitor.getApplicationCount(), 0); @@ -80,7 +80,7 @@ public void test() throws InterruptedException { @Test public void getMaxRunningMillis() { - SparkApplicationResource sparkApplicationResource = new SparkApplicationResource(); + SparkApplication sparkApplicationResource = new SparkApplication(); Assert.assertEquals( RunningApplicationMonitor.getMaxRunningMillis(sparkApplicationResource), 12 * 60 * 60 * 1000);