Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Oct 6, 2023
1 parent 3111ecd commit f325997
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ private DispatchQueryResponse handleDropIndexQuery(
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(indexDetails);
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
String errorMsg = "E";
try {
if (indexMetadata.isAutoRefresh()) {
emrServerlessClient.cancelJobRun(
Expand All @@ -223,17 +221,15 @@ private DispatchQueryResponse handleDropIndexQuery(
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
if (!response.isAcknowledged()) {
errorMsg = String.format("failed to delete index %s", indexName);
LOG.error(errorMsg);
LOG.error("failed to delete index");
}
status = JobRunState.SUCCESS.toString();
} catch (InterruptedException | ExecutionException e) {
errorMsg = String.format("failed to delete index %s", indexName);
LOG.error(errorMsg);
LOG.error("failed to delete index");
}
}
return new DispatchQueryResponse(
new DropIndexResult(status, errorMsg).toJobId(), true, dataSourceMetadata.getResultIndex());
new DropIndexResult(status).toJobId(), true, dataSourceMetadata.getResultIndex());
}

private static Map<String, String> getDefaultTagsForJobSubmission(
Expand All @@ -247,22 +243,17 @@ private static Map<String, String> getDefaultTagsForJobSubmission(
@Getter
@RequiredArgsConstructor
public static class DropIndexResult {
private static final int PREFIX_LEN = 4;
private static final String DELIMITER = ";";
private static final int PREFIX_LEN = 10;

private final String status;
private final String errorMsg;

public static DropIndexResult fromJobId(String jobId) {
String[] results =
new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN).split(DELIMITER);
return new DropIndexResult(results[0], results[1]);
String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN);
return new DropIndexResult(status);
}

public String toJobId() {
String queryId =
RandomStringUtils.randomAlphanumeric(PREFIX_LEN)
+ String.join(DELIMITER, status, errorMsg);
String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status;
return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8));
}

Expand All @@ -274,11 +265,11 @@ public JSONObject result() {
JSONObject dummyData = new JSONObject();
dummyData.put("result", new JSONArray());
dummyData.put("schema", new JSONArray());
dummyData.put("applicationId", "fakeDropId");
dummyData.put("applicationId", "fakeDropIndexApplicationId");
result.put(DATA_FIELD, dummyData);
} else {
result.put(STATUS_FIELD, status);
result.put(ERROR_FIELD, errorMsg);
result.put(ERROR_FIELD, "failed to drop index");
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface FlintIndexMetadataReader {
* Given Index details, get the streaming job Id.
*
* @param indexDetails indexDetails.
* @return jobId.
* @return FlintIndexMetadata.
*/
FlintIndexMetadata getJobIdFromFlintIndexMetadata(IndexDetails indexDetails);
FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {
private final Client client;

@Override
public FlintIndexMetadata getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class DropIndexResultTest {
public void successRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString(), "E").toJobId();
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
Expand All @@ -29,7 +29,7 @@ public void successRespEncodeDecode() {
JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD));
assertEquals(
"{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropId\"}",
"{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}",
result.get(DATA_FIELD).toString());
}

Expand All @@ -38,14 +38,14 @@ public void successRespEncodeDecode() {
public void failedRespEncodeDecode() {
// encode jobId
String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString(), "error").toJobId();
new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId();

// decode jobId
SparkQueryDispatcher.DropIndexResult dropIndexResult =
SparkQueryDispatcher.DropIndexResult.fromJobId(jobId);

JSONObject result = dropIndexResult.result();
assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD));
assertEquals("error", result.get(ERROR_FIELD));
assertEquals("failed to drop index", result.get(ERROR_FIELD));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ void testGetQueryResponseOfDropIndex() {
openSearchClient);

String jobId =
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString(), "E").toJobId();
new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId();

JSONObject result =
sparkQueryDispatcher.getQueryResponse(
Expand All @@ -639,7 +639,7 @@ void testGetQueryResponseOfDropIndex() {
@Test
void testDropIndexQuery() throws ExecutionException, InterruptedException {
String query = "DROP INDEX size_year ON my_glue.default.http_logs";
when(flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
when(flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
"size_year",
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand Down Expand Up @@ -675,7 +675,7 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException {
verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata);
verify(flintIndexMetadataReader, times(1))
.getJobIdFromFlintIndexMetadata(
.getFlintIndexMetadata(
new IndexDetails(
"size_year",
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand All @@ -692,7 +692,7 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException {
@Test
void testDropSkippingIndexQuery() throws ExecutionException, InterruptedException {
String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs";
when(flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
when(flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
null,
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand Down Expand Up @@ -726,7 +726,7 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio
verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata);
verify(flintIndexMetadataReader, times(1))
.getJobIdFromFlintIndexMetadata(
.getFlintIndexMetadata(
new IndexDetails(
null,
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand All @@ -743,7 +743,7 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio
void testDropSkippingIndexQueryAutoRefreshFalse()
throws ExecutionException, InterruptedException {
String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs";
when(flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
when(flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
null,
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand Down Expand Up @@ -771,7 +771,7 @@ void testDropSkippingIndexQueryAutoRefreshFalse()
verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata);
verify(flintIndexMetadataReader, times(1))
.getJobIdFromFlintIndexMetadata(
.getFlintIndexMetadata(
new IndexDetails(
null,
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand All @@ -788,7 +788,7 @@ void testDropSkippingIndexQueryAutoRefreshFalse()
void testDropSkippingIndexQueryDeleteIndexException()
throws ExecutionException, InterruptedException {
String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs";
when(flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
when(flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
null,
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand Down Expand Up @@ -817,7 +817,7 @@ void testDropSkippingIndexQueryDeleteIndexException()
verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata);
verify(flintIndexMetadataReader, times(1))
.getJobIdFromFlintIndexMetadata(
.getFlintIndexMetadata(
new IndexDetails(
null,
new FullyQualifiedTableName("my_glue.default.http_logs"),
Expand All @@ -828,8 +828,8 @@ void testDropSkippingIndexQueryDeleteIndexException()
SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId());
Assertions.assertEquals(JobRunState.FAILED.toString(), dropIndexResult.getStatus());
Assertions.assertEquals(
"failed to delete index flint_my_glue_default_http_logs_skipping_index",
dropIndexResult.getErrorMsg());
"{\"error\":\"failed to drop index\",\"status\":\"FAILED\"}",
dropIndexResult.result().toString());
Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ void testGetJobIdFromFlintSkippingIndexMetadata() {
String indexName = "flint_mys3_default_http_logs_skipping_index";
mockNodeClientIndicesMappings(indexName, mappings);
FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client);
FlintIndexMetadata jobId =
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
null,
new FullyQualifiedTableName("mys3.default.http_logs"),
false,
true,
FlintIndexType.SKIPPING));
Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId);
Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId());
}

@SneakyThrows
Expand All @@ -62,15 +62,15 @@ void testGetJobIdFromFlintCoveringIndexMetadata() {
String indexName = "flint_mys3_default_http_logs_cv1_index";
mockNodeClientIndicesMappings(indexName, mappings);
FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client);
String jobId =
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
"cv1",
new FullyQualifiedTableName("mys3.default.http_logs"),
false,
true,
FlintIndexType.COVERING));
Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId);
Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId());
}

@SneakyThrows
Expand All @@ -85,7 +85,7 @@ void testGetJobIDWithNPEException() {
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
"cv1",
new FullyQualifiedTableName("mys3.default.http_logs"),
Expand All @@ -103,7 +103,7 @@ void testGetJobIdFromUnsupportedIndex() {
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
flintIndexMetadataReader.getJobIdFromFlintIndexMetadata(
flintIndexMetadataReader.getFlintIndexMetadata(
new IndexDetails(
"cv1",
new FullyQualifiedTableName("mys3.default.http_logs"),
Expand Down

0 comments on commit f325997

Please sign in to comment.