Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Correctly Set query status #2231

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions docs/user/ppl/admin/connectors/s3glue_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@ S3Glue Connector
Introduction
============

Properties in DataSource Configuration

* name: A unique identifier for the data source within a domain.
* connector: Currently supports the following connectors: s3glue, spark, prometheus, and opensearch.
* resultIndex: Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result.

Glue Connector
========================================================

s3Glue connector provides a way to query s3 files using glue as metadata store and spark as execution engine.
This page covers s3Glue datasource configuration and also how to query and s3Glue datasource.


Required resources for s3 Glue Connector
===================================
* S3: This is where the data lies.
Expand All @@ -27,8 +35,6 @@ Required resources for s3 Glue Connector

We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future.

Glue Connector Properties in DataSource Configuration
========================================================
Glue Connector Properties.

* ``glue.auth.type`` [Required]
Expand Down Expand Up @@ -59,7 +65,8 @@ Glue datasource configuration::
"glue.indexstore.opensearch.auth" :"basicauth",
"glue.indexstore.opensearch.auth.username" :"username"
"glue.indexstore.opensearch.auth.password" :"password"
}
},
"resultIndex": "query_execution_result"
}]

[{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,34 +63,38 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}
}

// TODO : Fetch from Result Index and then make call to EMR Serverless.
public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
GetJobRunResult getJobRunResult =
emrServerlessClient.getJobRunResult(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
String jobState = getJobRunResult.getJobRun().getState();
// either empty json when the result is not available or data with status
// Fetch from Result Index
JSONObject result =
(jobState.equals(JobRunState.SUCCESS.toString()))
? jobExecutionResponseReader.getResultFromOpensearchIndex(
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex())
: new JSONObject();
jobExecutionResponseReader.getResultFromOpensearchIndex(
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());

// if result index document has a status, we are gonna use the status directly; otherwise, we
// will use emr-s job status
// a job is successful does not mean there is no error in execution. For example, even if result
// index mapping
// is incorrect, we still write query result and let the job finish.
// will use emr-s job status.
// That a job is successful does not mean there is no error in execution. For example, even if
// result
// index mapping is incorrect, we still write query result and let the job finish.
// That a job is running does not mean the status is running. For example, index/streaming Query
// is a
// long-running job which runs forever. But we need to return success from the result index
// immediately.
if (result.has(DATA_FIELD)) {
JSONObject items = result.getJSONObject(DATA_FIELD);

// If items have STATUS_FIELD, use it; otherwise, use jobState
String status = items.optString(STATUS_FIELD, jobState);
// If items have STATUS_FIELD, use it; otherwise, mark failed
String status = items.optString(STATUS_FIELD, JobRunState.FAILED.toString());
result.put(STATUS_FIELD, status);

// If items have ERROR_FIELD, use it; otherwise, set empty string
String error = items.optString(ERROR_FIELD, "");
result.put(ERROR_FIELD, error);
} else {
// make call to EMR Serverless when related result index documents are not available
GetJobRunResult getJobRunResult =
emrServerlessClient.getJobRunResult(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
String jobState = getJobRunResult.getJobRun().getState();
result.put(STATUS_FIELD, jobState);
result.put(ERROR_FIELD, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
Expand Down Expand Up @@ -46,8 +47,14 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) {
searchSourceBuilder.query(query);
searchRequest.source(searchSourceBuilder);
ActionFuture<SearchResponse> searchResponseActionFuture;
JSONObject data = new JSONObject();
try {
searchResponseActionFuture = client.search(searchRequest);
} catch (IndexNotFoundException e) {
// if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty
// json
LOG.info(resultIndex + " is not created yet.");
return data;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -59,7 +66,6 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) {
+ " index failed with status : "
+ searchResponse.status());
} else {
JSONObject data = new JSONObject();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
data.put(DATA_FIELD, searchHit.getSourceAsMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,14 @@ void testGetQueryResponse() {
flintIndexMetadataReader);
when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID))
.thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.PENDING)));

// simulate result index is not created yet
when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null))
.thenReturn(new JSONObject());
JSONObject result =
sparkQueryDispatcher.getQueryResponse(
new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null));
Assertions.assertEquals("PENDING", result.get("status"));
verifyNoInteractions(jobExecutionResponseReader);
}

@Test
Expand All @@ -629,8 +632,6 @@ void testGetQueryResponseWithSuccess() {
dataSourceUserAuthorizationHelper,
jobExecutionResponseReader,
flintIndexMetadataReader);
when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID))
.thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.SUCCESS)));
JSONObject queryResult = new JSONObject();
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(STATUS_FIELD, "SUCCESS");
Expand All @@ -641,7 +642,6 @@ void testGetQueryResponseWithSuccess() {
JSONObject result =
sparkQueryDispatcher.getQueryResponse(
new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null));
verify(emrServerlessClient, times(1)).getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID);
verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null);
Assertions.assertEquals(
new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet());
Expand All @@ -655,6 +655,7 @@ void testGetQueryResponseWithSuccess() {
// We need similar.
Assertions.assertTrue(dataJson.similar(result.get(DATA_FIELD)));
Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD));
verifyNoInteractions(emrServerlessClient);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

Expand Down Expand Up @@ -91,4 +93,12 @@ public void testSearchFailure() {
RuntimeException.class,
() -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null));
}

@Test
public void testIndexNotFoundException() {
when(client.search(any())).thenThrow(IndexNotFoundException.class);
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
assertTrue(
jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty());
}
}
Loading