diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index ef27cf572a..e150357679 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -14,10 +14,18 @@ S3Glue Connector Introduction ============ +Properties in DataSource Configuration + +* name: A unique identifier for the data source within a domain. +* connector: Currently supports the following connectors: s3glue, spark, prometheus, and opensearch. +* resultIndex: Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result. + +Glue Connector +======================================================== + s3Glue connector provides a way to query s3 files using glue as metadata store and spark as execution engine. This page covers s3Glue datasource configuration and also how to query and s3Glue datasource. - Required resources for s3 Glue Connector =================================== * S3: This is where the data lies. @@ -27,8 +35,6 @@ Required resources for s3 Glue Connector We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future. -Glue Connector Properties in DataSource Configuration -======================================================== Glue Connector Properties. * ``glue.auth.type`` [Required] @@ -59,7 +65,8 @@ Glue datasource configuration:: "glue.indexstore.opensearch.auth" :"basicauth", "glue.indexstore.opensearch.auth.username" :"username" "glue.indexstore.opensearch.auth.password" :"password" - } + }, + "resultIndex": "query_execution_result" }] [{ diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index f5ef419294..dcce11fd55 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -63,34 +63,38 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) } } - // TODO : Fetch from Result Index and then make call to EMR Serverless. public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { - GetJobRunResult getJobRunResult = - emrServerlessClient.getJobRunResult( - asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId()); - String jobState = getJobRunResult.getJobRun().getState(); + // either empty json when the result is not available or data with status + // Fetch from Result Index JSONObject result = - (jobState.equals(JobRunState.SUCCESS.toString())) - ? jobExecutionResponseReader.getResultFromOpensearchIndex( - asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()) - : new JSONObject(); + jobExecutionResponseReader.getResultFromOpensearchIndex( + asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()); // if result index document has a status, we are gonna use the status directly; otherwise, we - // will use emr-s job status - // a job is successful does not mean there is no error in execution. For example, even if result - // index mapping - // is incorrect, we still write query result and let the job finish. + // will use emr-s job status. + // That a job is successful does not mean there is no error in execution. For example, even if + // result + // index mapping is incorrect, we still write query result and let the job finish. + // That a job is running does not mean the status is running. For example, index/streaming Query + // is a + // long-running job which runs forever. But we need to return success from the result index + // immediately. if (result.has(DATA_FIELD)) { JSONObject items = result.getJSONObject(DATA_FIELD); - // If items have STATUS_FIELD, use it; otherwise, use jobState - String status = items.optString(STATUS_FIELD, jobState); + // If items have STATUS_FIELD, use it; otherwise, mark failed + String status = items.optString(STATUS_FIELD, JobRunState.FAILED.toString()); result.put(STATUS_FIELD, status); // If items have ERROR_FIELD, use it; otherwise, set empty string String error = items.optString(ERROR_FIELD, ""); result.put(ERROR_FIELD, error); } else { + // make call to EMR Serverless when related result index documents are not available + GetJobRunResult getJobRunResult = + emrServerlessClient.getJobRunResult( + asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId()); + String jobState = getJobRunResult.getJobRun().getState(); result.put(STATUS_FIELD, jobState); result.put(ERROR_FIELD, ""); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index 5da0ef44fe..d3cbd68dce 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -16,6 +16,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.action.ActionFuture; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; @@ -46,8 +47,14 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); ActionFuture searchResponseActionFuture; + JSONObject data = new JSONObject(); try { searchResponseActionFuture = client.search(searchRequest); + } catch (IndexNotFoundException e) { + // if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty + // json + LOG.info(resultIndex + " is not created yet."); + return data; } catch (Exception e) { throw new RuntimeException(e); } @@ -59,7 +66,6 @@ private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { + " index failed with status : " + searchResponse.status()); } else { - JSONObject data = new JSONObject(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { data.put(DATA_FIELD, searchHit.getSourceAsMap()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index c89c122d11..925e6f1a90 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -613,11 +613,14 @@ void testGetQueryResponse() { flintIndexMetadataReader); when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID)) .thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.PENDING))); + + // simulate result index is not created yet + when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)) + .thenReturn(new JSONObject()); JSONObject result = sparkQueryDispatcher.getQueryResponse( new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null)); Assertions.assertEquals("PENDING", result.get("status")); - verifyNoInteractions(jobExecutionResponseReader); } @Test @@ -629,8 +632,6 @@ void testGetQueryResponseWithSuccess() { dataSourceUserAuthorizationHelper, jobExecutionResponseReader, flintIndexMetadataReader); - when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID)) - .thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.SUCCESS))); JSONObject queryResult = new JSONObject(); Map resultMap = new HashMap<>(); resultMap.put(STATUS_FIELD, "SUCCESS"); @@ -641,7 +642,6 @@ void testGetQueryResponseWithSuccess() { JSONObject result = sparkQueryDispatcher.getQueryResponse( new AsyncQueryJobMetadata(EMRS_APPLICATION_ID, EMR_JOB_ID, null)); - verify(emrServerlessClient, times(1)).getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null); Assertions.assertEquals( new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet()); @@ -655,6 +655,7 @@ void testGetQueryResponseWithSuccess() { // We need similar. Assertions.assertTrue(dataJson.similar(result.get(DATA_FIELD))); Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD)); + verifyNoInteractions(emrServerlessClient); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java index 7d7ebd42b3..fefc951dd7 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; @@ -24,6 +25,7 @@ import org.opensearch.client.Client; import org.opensearch.common.action.ActionFuture; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -91,4 +93,12 @@ public void testSearchFailure() { RuntimeException.class, () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); } + + @Test + public void testIndexNotFoundException() { + when(client.search(any())).thenThrow(IndexNotFoundException.class); + JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + assertTrue( + jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty()); + } }