From b57f7ccaed23f2ee1fb844c455882196e2a175bc Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 7 Nov 2023 20:02:05 -0800 Subject: [PATCH] Add more metrics and handle emr exception message (#2422) (#2426) (cherry picked from commit 53fd85e6696062a5a2f2ab52204bd3f06b60f4d6) Signed-off-by: Vamsi Manohar Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../rest/RestDataSourceQueryAction.java | 8 ++- .../TransportCreateDataSourceAction.java | 3 -- .../TransportDeleteDataSourceAction.java | 3 -- .../TransportGetDataSourceAction.java | 3 -- .../TransportPatchDataSourceAction.java | 3 -- .../TransportUpdateDataSourceAction.java | 3 -- .../sql/legacy/metrics/MetricName.java | 18 +++++++ .../spark/client/EmrServerlessClientImpl.java | 43 ++++++++-------- .../rest/RestAsyncQueryManagementAction.java | 49 +++++++++++++++++-- .../client/EmrServerlessClientImplTest.java | 49 ++++++++++--------- 10 files changed, 116 insertions(+), 66 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java index 5693df3486..02f87a69f2 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java @@ -135,6 +135,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_CREATION_REQ_COUNT); DataSourceMetadata dataSourceMetadata = XContentParserUtils.toDataSourceMetadata(restRequest.contentParser()); return restChannel -> @@ -163,6 +164,7 @@ public void onFailure(Exception e) { } private RestChannelConsumer executeGetRequest(RestRequest restRequest, NodeClient nodeClient) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_GET_REQ_COUNT); String dataSourceName = restRequest.param("dataSourceName"); return restChannel -> Scheduler.schedule( @@ -191,6 +193,7 @@ public void onFailure(Exception e) { private RestChannelConsumer executeUpdateRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PUT_REQ_COUNT); DataSourceMetadata dataSourceMetadata = XContentParserUtils.toDataSourceMetadata(restRequest.contentParser()); return restChannel -> @@ -220,6 +223,7 @@ public void onFailure(Exception e) { private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PATCH_REQ_COUNT); Map dataSourceData = XContentParserUtils.toMap(restRequest.contentParser()); return restChannel -> Scheduler.schedule( @@ -247,7 +251,7 @@ public void onFailure(Exception e) { } private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) { - + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_DELETE_REQ_COUNT); String dataSourceName = restRequest.param("dataSourceName"); return restChannel -> Scheduler.schedule( @@ -276,8 +280,10 @@ public void onFailure(Exception e) { private void handleException(Exception e, RestChannel restChannel) { if (e instanceof DataSourceNotFoundException) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_CUS); reportError(restChannel, e, NOT_FOUND); } else if (e instanceof OpenSearchException) { + MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS); OpenSearchException exception = (OpenSearchException) e; reportError(restChannel, exception, exception.status()); } else { diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java index 1b3e678f5d..95e6493e05 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceAction.java @@ -20,8 +20,6 @@ import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -62,7 +60,6 @@ protected void doExecute( Task task, CreateDataSourceActionRequest request, ActionListener actionListener) { - MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_CREATION_REQ_COUNT); int dataSourceLimit = settings.getSettingValue(DATASOURCES_LIMIT); if (dataSourceService.getDataSourceMetadata(false).size() >= dataSourceLimit) { actionListener.onFailure( diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java index bcc5ef650f..5578d40651 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportDeleteDataSourceAction.java @@ -16,8 +16,6 @@ import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -55,7 +53,6 @@ protected void doExecute( Task task, DeleteDataSourceActionRequest request, ActionListener actionListener) { - MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_DELETE_REQ_COUNT); try { dataSourceService.deleteDataSource(request.getDataSourceName()); actionListener.onResponse( diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java index c8d77dd2e7..34ad59c80f 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceAction.java @@ -18,8 +18,6 @@ import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -58,7 +56,6 @@ protected void doExecute( Task task, GetDataSourceActionRequest request, ActionListener actionListener) { - MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_GET_REQ_COUNT); try { String responseContent; if (request.getDataSourceName() == null) { diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java index 8c9334f3a6..303e905cec 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportPatchDataSourceAction.java @@ -19,8 +19,6 @@ import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -59,7 +57,6 @@ protected void doExecute( Task task, PatchDataSourceActionRequest request, ActionListener actionListener) { - MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PATCH_REQ_COUNT); try { dataSourceService.patchDataSource(request.getDataSourceData()); String responseContent = diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java index 32394ab64c..fefd0f3a01 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceAction.java @@ -18,8 +18,6 @@ import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest; import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -58,7 +56,6 @@ protected void doExecute( Task task, UpdateDataSourceActionRequest request, ActionListener actionListener) { - MetricUtils.incrementNumericalMetric(MetricName.DATASOURCE_PUT_REQ_COUNT); try { dataSourceService.updateDataSource(request.getDataSourceMetadata()); String responseContent = diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java index 0098008e57..91ade7b038 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java @@ -33,6 +33,15 @@ public enum MetricName { DATASOURCE_DELETE_REQ_COUNT("datasource_delete_request_count"), DATASOURCE_FAILED_REQ_COUNT_SYS("datasource_failed_request_count_syserr"), DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"), + ASYNC_QUERY_CREATE_API_REQUEST_COUNT("async_query_create_api_request_count"), + ASYNC_QUERY_GET_API_REQUEST_COUNT("async_query_get_api_request_count"), + ASYNC_QUERY_CANCEL_API_REQUEST_COUNT("async_query_cancel_api_request_count"), + ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS("async_query_get_api_failed_request_count_syserr"), + ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS("async_query_get_api_failed_request_count_cuserr"), + ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS("async_query_create_api_failed_request_count_syserr"), + ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS("async_query_create_api_failed_request_count_cuserr"), + ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS("async_query_cancel_api_failed_request_count_syserr"), + ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS("async_query_cancel_api_failed_request_count_cuserr"), EMR_START_JOB_REQUEST_FAILURE_COUNT("emr_start_job_request_failure_count"), EMR_GET_JOB_RESULT_FAILURE_COUNT("emr_get_job_request_failure_count"), EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"), @@ -73,6 +82,15 @@ public static List getNames() { .add(EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT) .add(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT) .add(EMR_BATCH_QUERY_JOBS_CREATION_COUNT) + .add(ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS) + .add(ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS) + .add(ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS) + .add(ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS) + .add(ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS) + .add(ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS) + .add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT) + .add(ASYNC_QUERY_GET_API_REQUEST_COUNT) + .add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT) .build(); public boolean isNumerical() { diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index d7f558a020..913e1ac378 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -17,7 +17,6 @@ import com.amazonaws.services.emrserverless.model.SparkSubmit; import com.amazonaws.services.emrserverless.model.StartJobRunRequest; import com.amazonaws.services.emrserverless.model.StartJobRunResult; -import com.amazonaws.services.emrserverless.model.ValidationException; import java.security.AccessController; import java.security.PrivilegedAction; import org.apache.logging.log4j.LogManager; @@ -30,6 +29,8 @@ public class EmrServerlessClientImpl implements EMRServerlessClient { private final AWSEMRServerless emrServerless; private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class); + private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error."; + public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { this.emrServerless = emrServerless; } @@ -62,9 +63,10 @@ public String startJobRun(StartJobRequest startJobRequest) { try { return emrServerless.startJobRun(request); } catch (Throwable t) { + logger.error("Error while making start job request to emr:", t); MetricUtils.incrementNumericalMetric( MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT); - throw t; + throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); } }); logger.info("Job Run ID: " + startJobRunResult.getJobRunId()); @@ -82,9 +84,10 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { try { return emrServerless.getJobRun(request); } catch (Throwable t) { + logger.error("Error while making get job run request to emr:", t); MetricUtils.incrementNumericalMetric( MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT); - throw t; + throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); } }); logger.info("Job Run state: " + getJobRunResult.getJobRun().getState()); @@ -95,24 +98,20 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { CancelJobRunRequest cancelJobRunRequest = new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId); - try { - CancelJobRunResult cancelJobRunResult = - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - try { - return emrServerless.cancelJobRun(cancelJobRunRequest); - } catch (Throwable t) { - MetricUtils.incrementNumericalMetric( - MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT); - throw t; - } - }); - logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); - return cancelJobRunResult; - } catch (ValidationException e) { - throw new IllegalArgumentException( - String.format("Couldn't cancel the queryId: %s due to %s", jobId, e.getMessage())); - } + CancelJobRunResult cancelJobRunResult = + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + try { + return emrServerless.cancelJobRun(cancelJobRunRequest); + } catch (Throwable t) { + logger.error("Error while making cancel job request to emr:", t); + MetricUtils.incrementNumericalMetric( + MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT); + throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE); + } + }); + logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId())); + return cancelJobRunResult; } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java b/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java index 741501cd18..4aed3439c9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java +++ b/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java @@ -27,6 +27,8 @@ import org.opensearch.rest.RestRequest; import org.opensearch.sql.datasources.exceptions.ErrorMessage; import org.opensearch.sql.datasources.utils.Scheduler; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; @@ -110,6 +112,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_CREATE_API_REQUEST_COUNT); CreateAsyncQueryRequest submitJobRequest = CreateAsyncQueryRequest.fromXContentParser(restRequest.contentParser()); return restChannel -> @@ -132,13 +135,14 @@ public void onResponse( @Override public void onFailure(Exception e) { - handleException(e, restChannel); + handleException(e, restChannel, restRequest.method()); } })); } private RestChannelConsumer executeGetAsyncQueryResultRequest( RestRequest restRequest, NodeClient nodeClient) { + MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_REQUEST_COUNT); String queryId = restRequest.param("queryId"); return restChannel -> Scheduler.schedule( @@ -160,26 +164,31 @@ public void onResponse( @Override public void onFailure(Exception e) { - handleException(e, restChannel); + handleException(e, restChannel, restRequest.method()); } })); } - private void handleException(Exception e, RestChannel restChannel) { + private void handleException( + Exception e, RestChannel restChannel, RestRequest.Method requestMethod) { if (e instanceof OpenSearchException) { OpenSearchException exception = (OpenSearchException) e; reportError(restChannel, exception, exception.status()); + addCustomerErrorMetric(requestMethod); } else { LOG.error("Error happened during request handling", e); if (isClientError(e)) { reportError(restChannel, e, BAD_REQUEST); + addCustomerErrorMetric(requestMethod); } else { reportError(restChannel, e, SERVICE_UNAVAILABLE); + addSystemErrorMetric(requestMethod); } } } private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) { + MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_CANCEL_API_REQUEST_COUNT); String queryId = restRequest.param("queryId"); return restChannel -> Scheduler.schedule( @@ -201,7 +210,7 @@ public void onResponse( @Override public void onFailure(Exception e) { - handleException(e, restChannel); + handleException(e, restChannel, restRequest.method()); } })); } @@ -214,4 +223,36 @@ private void reportError(final RestChannel channel, final Exception e, final Res private static boolean isClientError(Exception e) { return e instanceof IllegalArgumentException || e instanceof IllegalStateException; } + + private void addSystemErrorMetric(RestRequest.Method requestMethod) { + switch (requestMethod) { + case POST: + MetricUtils.incrementNumericalMetric( + MetricName.ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_SYS); + break; + case GET: + MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_SYS); + break; + case DELETE: + MetricUtils.incrementNumericalMetric( + MetricName.ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_SYS); + break; + } + } + + private void addCustomerErrorMetric(RestRequest.Method requestMethod) { + switch (requestMethod) { + case POST: + MetricUtils.incrementNumericalMetric( + MetricName.ASYNC_QUERY_CREATE_API_FAILED_REQ_COUNT_CUS); + break; + case GET: + MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_GET_API_FAILED_REQ_COUNT_CUS); + break; + case DELETE: + MetricUtils.incrementNumericalMetric( + MetricName.ASYNC_QUERY_CANCEL_API_FAILED_REQ_COUNT_CUS); + break; + } + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index 8129c3b0e0..67f4d9eb40 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -88,21 +88,23 @@ void testStartJobRun() { @Test void testStartJobRunWithErrorMetric() { - doThrow(new RuntimeException()).when(emrServerless).startJobRun(any()); + doThrow(new ValidationException("Couldn't start job")).when(emrServerless).startJobRun(any()); EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); - Assertions.assertThrows( - RuntimeException.class, - () -> - emrServerlessClient.startJobRun( - new StartJobRequest( - QUERY, - EMRS_JOB_NAME, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - SPARK_SUBMIT_PARAMETERS, - new HashMap<>(), - false, - null))); + RuntimeException runtimeException = + Assertions.assertThrows( + RuntimeException.class, + () -> + emrServerlessClient.startJobRun( + new StartJobRequest( + QUERY, + EMRS_JOB_NAME, + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + SPARK_SUBMIT_PARAMETERS, + new HashMap<>(), + false, + null))); + Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage()); } @Test @@ -136,11 +138,13 @@ void testGetJobRunState() { @Test void testGetJobRunStateWithErrorMetric() { - doThrow(new RuntimeException()).when(emrServerless).getJobRun(any()); + doThrow(new ValidationException("Not a good job")).when(emrServerless).getJobRun(any()); EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); - Assertions.assertThrows( - RuntimeException.class, - () -> emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123")); + RuntimeException runtimeException = + Assertions.assertThrows( + RuntimeException.class, + () -> emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123")); + Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage()); } @Test @@ -165,13 +169,10 @@ void testCancelJobRunWithErrorMetric() { void testCancelJobRunWithValidationException() { doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any()); EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); - IllegalArgumentException illegalArgumentException = + RuntimeException runtimeException = Assertions.assertThrows( - IllegalArgumentException.class, + RuntimeException.class, () -> emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)); - Assertions.assertEquals( - "Couldn't cancel the queryId: job-123xxx due to Error (Service: null; Status Code: 0; Error" - + " Code: null; Request ID: null; Proxy: null)", - illegalArgumentException.getMessage()); + Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage()); } }