diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index d06153bf79..3595ccf608 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -42,7 +42,7 @@ public class BatchQueryHandler extends AsyncQueryHandler { protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { // either empty json when the result is not available or data with status // Fetch from Result Index - return jobExecutionResponseReader.getResultFromOpensearchIndex( + return jobExecutionResponseReader.getResultWithJobId( asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index e4773310f0..916d9460d1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -5,75 +5,25 @@ package org.opensearch.sql.spark.response; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.Client; -import org.opensearch.common.action.ActionFuture; -import org.opensearch.index.IndexNotFoundException; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.SearchHit; -import org.opensearch.search.builder.SearchSourceBuilder; - -public class JobExecutionResponseReader { - private final Client client; - private static final Logger LOG = LogManager.getLogger(); +/** Interface for reading job execution result */ +public interface JobExecutionResponseReader { /** - * JobExecutionResponseReader for spark query. + * Retrieves the result from the OpenSearch index based on the job ID. * - * @param client Opensearch client + * @param jobId The job ID. + * @param resultLocation The location identifier where the result is stored (optional). + * @return A JSONObject containing the result data. */ - public JobExecutionResponseReader(Client client) { - this.client = client; - } - - public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) { - return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, jobId), resultIndex); - } - - public JSONObject getResultWithQueryId(String queryId, String resultIndex) { - return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultIndex); - } + JSONObject getResultWithJobId(String jobId, String resultLocation); - private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { - SearchRequest searchRequest = new SearchRequest(); - String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; - searchRequest.indices(searchResultIndex); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(query); - searchRequest.source(searchSourceBuilder); - ActionFuture searchResponseActionFuture; - JSONObject data = new JSONObject(); - try { - searchResponseActionFuture = client.search(searchRequest); - } catch (IndexNotFoundException e) { - // if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty - // json - LOG.info(resultIndex + " is not created yet."); - return data; - } catch (Exception e) { - throw new RuntimeException(e); - } - SearchResponse searchResponse = searchResponseActionFuture.actionGet(); - if (searchResponse.status().getStatus() != 200) { - throw new RuntimeException( - "Fetching result from " - + searchResultIndex - + " index failed with status : " - + searchResponse.status()); - } else { - for (SearchHit searchHit : searchResponse.getHits().getHits()) { - data.put(DATA_FIELD, searchHit.getSourceAsMap()); - } - return data; - } - } + /** + * Retrieves the result from the OpenSearch index based on the query ID. + * + * @param queryId The query ID. + * @param resultLocation The location identifier where the result is stored (optional). + * @return A JSONObject containing the result data. + */ + JSONObject getResultWithQueryId(String queryId, String resultLocation); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java new file mode 100644 index 0000000000..10113ece8d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.response; + +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; + +/** JobExecutionResponseReader implementation for reading response from OpenSearch index. */ +public class OpenSearchJobExecutionResponseReader implements JobExecutionResponseReader { + private final Client client; + private static final Logger LOG = LogManager.getLogger(); + + public OpenSearchJobExecutionResponseReader(Client client) { + this.client = client; + } + + @Override + public JSONObject getResultWithJobId(String jobId, String resultLocation) { + return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, jobId), resultLocation); + } + + @Override + public JSONObject getResultWithQueryId(String queryId, String resultLocation) { + return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultLocation); + } + + private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { + SearchRequest searchRequest = new SearchRequest(); + String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + searchRequest.indices(searchResultIndex); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + ActionFuture searchResponseActionFuture; + JSONObject data = new JSONObject(); + try { + searchResponseActionFuture = client.search(searchRequest); + } catch (IndexNotFoundException e) { + // if there is no result index (e.g., EMR-S hasn't created the index yet), we return empty + // json + LOG.info(resultIndex + " is not created yet."); + return data; + } catch (Exception e) { + throw new RuntimeException(e); + } + SearchResponse searchResponse = searchResponseActionFuture.actionGet(); + if (searchResponse.status().getStatus() != 200) { + throw new RuntimeException( + "Fetching result from " + + searchResultIndex + + " index failed with status : " + + searchResponse.status()); + } else { + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + data.put(DATA_FIELD, searchHit.getSourceAsMap()); + } + return data; + } + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 5007cff64e..615a914fee 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -45,6 +45,7 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; @RequiredArgsConstructor public class AsyncExecutorServiceModule extends AbstractModule { @@ -87,7 +88,7 @@ public SparkQueryDispatcher sparkQueryDispatcher( @Provides public QueryHandlerFactory queryhandlerFactory( - JobExecutionResponseReader jobExecutionResponseReader, + JobExecutionResponseReader openSearchJobExecutionResponseReader, FlintIndexMetadataServiceImpl flintIndexMetadataReader, SessionManager sessionManager, DefaultLeaseManager defaultLeaseManager, @@ -95,7 +96,7 @@ public QueryHandlerFactory queryhandlerFactory( FlintIndexOpFactory flintIndexOpFactory, EMRServerlessClientFactory emrServerlessClientFactory) { return new QueryHandlerFactory( - jobExecutionResponseReader, + openSearchJobExecutionResponseReader, flintIndexMetadataReader, sessionManager, defaultLeaseManager, @@ -172,7 +173,7 @@ public FlintIndexMetadataServiceImpl flintIndexMetadataReader(NodeClient client) @Provides public JobExecutionResponseReader jobExecutionResponseReader(NodeClient client) { - return new JobExecutionResponseReader(client); + return new OpenSearchJobExecutionResponseReader(client); } private void registerStateStoreMetrics(StateStore stateStore) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 85bb92bba2..b9e8909697 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -79,6 +79,7 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.test.OpenSearchIntegTestCase; @@ -222,7 +223,7 @@ private DataSourceServiceImpl createDataSourceService() { protected AsyncQueryExecutorService createAsyncQueryExecutorService( EMRServerlessClientFactory emrServerlessClientFactory) { return createAsyncQueryExecutorService( - emrServerlessClientFactory, new JobExecutionResponseReader(client)); + emrServerlessClientFactory, new OpenSearchJobExecutionResponseReader(client)); } /** Pass a custom response reader which can mock interaction between PPL plugin and EMR-S job. */ diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index f2c3bda026..dee6a9a911 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -31,6 +31,7 @@ import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; import org.opensearch.sql.spark.rest.model.LangType; @@ -422,9 +423,9 @@ private class AssertionHelper { * current interaction. Intercept both get methods for different query handler which * will only call either of them. */ - new JobExecutionResponseReader(client) { + new JobExecutionResponseReader() { @Override - public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) { + public JSONObject getResultWithJobId(String jobId, String resultIndex) { return interaction.interact(new InteractionStep(emrClient, jobId, resultIndex)); } @@ -493,7 +494,7 @@ private InteractionStep(LocalEMRSClient emrClient, String queryId, String result /** Simulate PPL plugin search query_execution_result */ JSONObject pluginSearchQueryResult() { - return new JobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex); + return new OpenSearchJobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex); } /** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */ diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 08aa0e4d0e..b41e400567 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -890,7 +890,7 @@ void testGetQueryResponse() { when(emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, EMR_JOB_ID)) .thenReturn(new GetJobRunResult().withJobRun(new JobRun().withState(JobRunState.PENDING))); // simulate result index is not created yet - when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)) + when(jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null)) .thenReturn(new JSONObject()); JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata()); @@ -962,12 +962,12 @@ void testGetQueryResponseWithSuccess() { resultMap.put(STATUS_FIELD, "SUCCESS"); resultMap.put(ERROR_FIELD, ""); queryResult.put(DATA_FIELD, resultMap); - when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)) + when(jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null)) .thenReturn(queryResult); JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata()); - verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null); + verify(jobExecutionResponseReader, times(1)).getResultWithJobId(EMR_JOB_ID, null); Assertions.assertEquals( new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet()); JSONObject dataJson = new JSONObject(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java similarity index 76% rename from spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java rename to spark/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java index bbaf6f0f59..297cab8819 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -30,12 +31,14 @@ import org.opensearch.search.SearchHits; @ExtendWith(MockitoExtension.class) -public class AsyncQueryExecutionResponseReaderTest { +public class OpenSearchJobExecutionResponseReaderTest { @Mock private Client client; @Mock private SearchResponse searchResponse; @Mock private SearchHit searchHit; @Mock private ActionFuture searchResponseActionFuture; + @InjectMocks OpenSearchJobExecutionResponseReader jobExecutionResponseReader; + @Test public void testGetResultFromOpensearchIndex() { when(client.search(any())).thenReturn(searchResponseActionFuture); @@ -46,9 +49,9 @@ public void testGetResultFromOpensearchIndex() { new SearchHits( new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + assertFalse( - jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null).isEmpty()); + jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null).isEmpty()); } @Test @@ -61,9 +64,9 @@ public void testGetResultFromCustomIndex() { new SearchHits( new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + assertFalse( - jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty()); + jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, "foo").isEmpty()); } @Test @@ -72,11 +75,11 @@ public void testInvalidSearchResponse() { when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); when(searchResponse.status()).thenReturn(RestStatus.NO_CONTENT); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); RuntimeException exception = assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); + () -> jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null)); + Assertions.assertEquals( "Fetching result from " + DEFAULT_RESULT_INDEX @@ -88,17 +91,17 @@ public void testInvalidSearchResponse() { @Test public void testSearchFailure() { when(client.search(any())).thenThrow(RuntimeException.class); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); + () -> jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, null)); } @Test public void testIndexNotFoundException() { when(client.search(any())).thenThrow(IndexNotFoundException.class); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + assertTrue( - jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, "foo").isEmpty()); + jobExecutionResponseReader.getResultWithJobId(EMR_JOB_ID, "foo").isEmpty()); } }