Skip to content

Commit

Permalink
Handle EMR Exceptions in FlintCancelJob Operation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Mar 25, 2024
1 parent 85dae6f commit 3ad9827
Show file tree
Hide file tree
Showing 14 changed files with 1,173 additions and 903 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import static org.opensearch.sql.datasources.utils.XContentParserUtils.DESCRIPTION_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.STATUS_FIELD;
import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient;
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.legacy.TestUtils.loadDataByRestClient;

import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
Expand All @@ -37,11 +40,6 @@

public class DataSourceAPIsIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
loadIndex(Index.DATASOURCES);
}

@After
public void cleanUp() throws IOException {
wipeAllClusterSettings();
Expand Down Expand Up @@ -397,6 +395,16 @@ public void patchDataSourceAPITest() {
@SneakyThrows
@Test
public void testOldDataSourceModelLoadingThroughGetDataSourcesAPI() {
Index index = Index.DATASOURCES;
String indexName = index.getName();
String mapping = index.getMapping();
String dataSet = index.getDataSet();
if (isIndexExist(client(), indexName)) {
createIndexByRestClient(client(), indexName, mapping);
}
loadDataByRestClient(client(), indexName, dataSet);
// waiting for loaded indices.
Thread.sleep(1000);
// get datasource to validate the creation.
Request getRequest = getFetchDataSourceRequest(null);
Response getResponse = client().performRequest(getRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ public interface EMRServerlessClient {
* @param jobId jobId.
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRun(String applicationId, String jobId);
CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {

private static final int MAX_JOB_NAME_LENGTH = 255;

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";
public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
Expand Down Expand Up @@ -98,7 +98,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
CancelJobRunResult cancelJobRunResult =
Expand All @@ -108,10 +109,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
if (allowExceptionPropagation) {
throw t;
} else {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId().getId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void close() {
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
serverlessClient.cancelJobRun(
sessionModel.getApplicationId(), sessionModel.getJobId(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -145,11 +147,18 @@ public void cancelStreamingJob(
String jobId = flintIndexStateModel.getJobId();
try {
emrServerlessClient.cancelJobRun(
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId());
} catch (IllegalArgumentException e) {
// handle job does not exist case.
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId(), true);
} catch (ValidationException e) {
// Exception when the job is not in cancellable state and already in terminal state.
if (e.getMessage().contains("Job run is not in a cancellable state")) {
LOG.error(e);
return;
} else {
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
} catch (Exception e) {
LOG.error(e);
return;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}

// pull job state until timeout or cancelled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
cancelJobRunCalled++;
return new CancelJobRunResult().withJobRunId(jobId);
}
Expand Down
Loading

0 comments on commit 3ad9827

Please sign in to comment.