Skip to content

Commit

Permalink
Handle EMR Exceptions in FlintCancelJob Operation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Mar 20, 2024
1 parent 4dc83b7 commit 516f865
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public interface EMRServerlessClient {
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRun(String applicationId, String jobId);

/**
* Cancel emr serverless job run.
*
* @param applicationId applicationId.
* @param jobId jobId.
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRunWithNativeEMRException(String applicationId, String jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {

private static final int MAX_JOB_NAME_LENGTH = 255;

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";
public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
Expand Down Expand Up @@ -117,4 +117,16 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
}

@Override
public CancelJobRunResult cancelJobRunWithNativeEMRException(String applicationId, String jobId) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
CancelJobRunResult cancelJobRunResult =
AccessController.doPrivileged(
(PrivilegedAction<CancelJobRunResult>)
() -> emrServerless.cancelJobRun(cancelJobRunRequest));
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
return cancelJobRunResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ResourceNotFoundException;
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -144,12 +147,19 @@ public void cancelStreamingJob(
String applicationId = flintIndexStateModel.getApplicationId();
String jobId = flintIndexStateModel.getJobId();
try {
emrServerlessClient.cancelJobRun(
emrServerlessClient.cancelJobRunWithNativeEMRException(
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId());
} catch (IllegalArgumentException e) {
// handle job does not exist case.
} catch (ResourceNotFoundException e) {
// JobId Not Found Exception.
LOG.error(e);
return;
} catch (ValidationException e) {
// Exception when the job is not in cancellable state and already in terminal state.
LOG.error(e);
return;
} catch (Exception e) {
LOG.error(e);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}

// pull job state until timeout or cancelled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
return new CancelJobRunResult().withJobRunId(jobId);
}

@Override
public CancelJobRunResult cancelJobRunWithNativeEMRException(
String applicationId, String jobId) {
return null;
}

public void startJobRunCalled(int expectedTimes) {
assertEquals(expectedTimes, startJobRunCalled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,29 @@ void testCancelJobRunWithValidationException() {
Assertions.assertEquals("Internal Server Error.", runtimeException.getMessage());
}

@Test
void testCancelJobRunWithNativeEMRExceptionWithValidationException() {
doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
ValidationException validationException =
Assertions.assertThrows(
ValidationException.class,
() ->
emrServerlessClient.cancelJobRunWithNativeEMRException(
EMRS_APPLICATION_ID, EMR_JOB_ID));
Assertions.assertTrue(validationException.getMessage().contains("Error"));
}

@Test
void testCancelJobRunWithNativeEMRException() {
when(emrServerless.cancelJobRun(any()))
.thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID));
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
CancelJobRunResult cancelJobRunResult =
emrServerlessClient.cancelJobRunWithNativeEMRException(EMRS_APPLICATION_ID, EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId());
}

@Test
void testStartJobRunWithLongJobName() {
StartJobRunResult response = new StartJobRunResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
return null;
}

@Override
public CancelJobRunResult cancelJobRunWithNativeEMRException(
String applicationId, String jobId) {
return null;
}

public void startJobRunCalled(int expectedTimes) {
assertEquals(expectedTimes, startJobRunCalled);
}
Expand Down

0 comments on commit 516f865

Please sign in to comment.