Skip to content

Commit

Permalink
Handle EMR Exceptions in FlintCancelJob Operation (#2577) (#2589)
Browse files Browse the repository at this point in the history
(cherry picked from commit bfcaedf)

Signed-off-by: Vamsi Manohar <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 5fa8c3c commit 8875a03
Show file tree
Hide file tree
Showing 15 changed files with 1,179 additions and 909 deletions.
12 changes: 6 additions & 6 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ SQL query::


plugins.query.executionengine.spark.session_inactivity_timeout_millis
===============================
=====================================================================

Description
-----------
Expand Down Expand Up @@ -456,7 +456,7 @@ SQL query::


plugins.query.executionengine.spark.auto_index_management.enabled
===============================
=================================================================

Description
-----------
Expand Down Expand Up @@ -492,7 +492,7 @@ SQL query::


plugins.query.executionengine.spark.session.index.ttl
===============================
=====================================================

Description
-----------
Expand Down Expand Up @@ -529,7 +529,7 @@ SQL query::


plugins.query.executionengine.spark.result.index.ttl
===============================
====================================================

Description
-----------
Expand Down Expand Up @@ -565,7 +565,7 @@ SQL query::
}

plugins.query.executionengine.async_query.enabled
===============================
=================================================

Description
-----------
Expand Down Expand Up @@ -596,7 +596,7 @@ Request::
}

plugins.query.executionengine.spark.streamingjobs.housekeeper.interval
===============================
======================================================================

Description
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import static org.opensearch.sql.datasources.utils.XContentParserUtils.DESCRIPTION_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.STATUS_FIELD;
import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient;
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.legacy.TestUtils.loadDataByRestClient;

import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
Expand All @@ -37,11 +40,6 @@

public class DataSourceAPIsIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
loadIndex(Index.DATASOURCES);
}

@After
public void cleanUp() throws IOException {
wipeAllClusterSettings();
Expand Down Expand Up @@ -397,6 +395,16 @@ public void patchDataSourceAPITest() {
@SneakyThrows
@Test
public void testOldDataSourceModelLoadingThroughGetDataSourcesAPI() {
Index index = Index.DATASOURCES;
String indexName = index.getName();
String mapping = index.getMapping();
String dataSet = index.getDataSet();
if (!isIndexExist(client(), indexName)) {
createIndexByRestClient(client(), indexName, mapping);
}
loadDataByRestClient(client(), indexName, dataSet);
// waiting for loaded indices.
Thread.sleep(1000);
// get datasource to validate the creation.
Request getRequest = getFetchDataSourceRequest(null);
Response getResponse = client().performRequest(getRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ public interface EMRServerlessClient {
* @param jobId jobId.
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRun(String applicationId, String jobId);
CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {

private static final int MAX_JOB_NAME_LENGTH = 255;

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";
public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
Expand Down Expand Up @@ -98,7 +98,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
CancelJobRunResult cancelJobRunResult =
Expand All @@ -108,10 +109,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
if (allowExceptionPropagation) {
throw t;
} else {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId().getId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void close() {
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
serverlessClient.cancelJobRun(
sessionModel.getApplicationId(), sessionModel.getJobId(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -145,11 +147,18 @@ public void cancelStreamingJob(
String jobId = flintIndexStateModel.getJobId();
try {
emrServerlessClient.cancelJobRun(
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId());
} catch (IllegalArgumentException e) {
// handle job does not exist case.
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId(), true);
} catch (ValidationException e) {
// Exception when the job is not in cancellable state and already in terminal state.
if (e.getMessage().contains("Job run is not in a cancellable state")) {
LOG.error(e);
return;
} else {
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
} catch (Exception e) {
LOG.error(e);
return;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}

// pull job state until timeout or cancelled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
cancelJobRunCalled++;
return new CancelJobRunResult().withJobRunId(jobId);
}
Expand Down
Loading

0 comments on commit 8875a03

Please sign in to comment.