From fbfa1be6aec98ca09336fc15ac7bbf5ce7ec32d7 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 23 Oct 2023 15:23:25 -0700 Subject: [PATCH 1/4] Fix bug, using basic instead of basicauth Signed-off-by: Peng Huo --- .../model/SparkSubmitParameters.java | 3 +- .../dispatcher/InteractiveQueryHandler.java | 2 +- .../session/CreateSessionRequest.java | 21 +++- ...AsyncQueryExecutorServiceImplSpecTest.java | 118 +++++++++++++++++- 4 files changed, 139 insertions(+), 5 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index db78abb2a8..9a73b0f364 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -31,6 +31,7 @@ public class SparkSubmitParameters { public static final String SPACE = " "; public static final String EQUALS = "="; + public static final String FLINT_BASIC_AUTH = "basic"; private final String className; private final Map config; @@ -114,7 +115,7 @@ private void setFlintIndexStoreAuthProperties( Supplier password, Supplier region) { if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) { - config.put(FLINT_INDEX_STORE_AUTH_KEY, authType); + config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_BASIC_AUTH); config.put(FLINT_INDEX_STORE_AUTH_USERNAME, userName.get()); config.put(FLINT_INDEX_STORE_AUTH_PASSWORD, password.get()); } else if (AuthenticationType.get(authType).equals(AuthenticationType.AWSSIGV4AUTH)) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 24ea1528c8..52cc2efbe2 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -39,7 +39,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob Statement statement = getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId); StatementState statementState = statement.getStatementState(); result.put(STATUS_FIELD, statementState.getState()); - result.put(ERROR_FIELD, ""); + result.put(ERROR_FIELD, Optional.of(statement.getStatementModel().getError()).orElse("")); return result; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java index ca2b2b4867..27ba1fd0c8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java @@ -21,14 +21,31 @@ public class CreateSessionRequest { private final String datasourceName; public StartJobRequest getStartJobRequest() { - return new StartJobRequest( + return new InteractiveSessionStartJobRequest( "select 1", jobName, applicationId, executionRoleArn, sparkSubmitParametersBuilder.build().toString(), tags, - false, resultIndex); } + + static class InteractiveSessionStartJobRequest extends StartJobRequest{ + public InteractiveSessionStartJobRequest(String query, String jobName, String applicationId, + String executionRoleArn, String sparkSubmitParams, + Map tags, + String resultIndex) { + super(query, jobName, applicationId, executionRoleArn, sparkSubmitParams, tags, + false, resultIndex); + } + + /** + * Interactive query keep running. + */ + @Override + public Long executionTimeout() { + return 0L; + } + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 1ee119df78..f00f1e25e1 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -17,6 +17,7 @@ import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; +import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; @@ -269,8 +270,123 @@ public void reuseSessionWhenCreateAsyncQuery() { assertEquals(second.getQueryId(), secondModel.get().getQueryId()); } + @Test + public void batchQueryHasTimeout() { + LocalEMRSClient emrsClient = new LocalEMRSClient(); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + enableSession(false); + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + + assertEquals(120L, (long) emrsClient.getJobRequest().executionTimeout()); + } + + @Test + public void interactiveQueryNoTimeout() { + LocalEMRSClient emrsClient = new LocalEMRSClient(); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // enable session + enableSession(true); + + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + assertEquals(0L, (long) emrsClient.getJobRequest().executionTimeout()); + } + + @Test + public void datasourceWithBasicAuth() { + dataSourceService.createDataSource( + new DataSourceMetadata( + "mybasicauth", + DataSourceType.S3GLUE, + ImmutableList.of(), + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://ec2-18-237-133-156.us-west-2.compute.amazonaws" + ".com:9200", + "glue.indexstore.opensearch.auth", + "basicauth", + "glue.indexstore.opensearch.auth.username", "username", + "glue.indexstore.opensearch.auth.password","admin"), + null)); + LocalEMRSClient emrsClient = new LocalEMRSClient(); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // enable session + enableSession(true); + + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("select 1", "mybasicauth", LangType.SQL, null)); + String params = emrsClient.getJobRequest().getSparkSubmitParams(); + assertTrue( + params.contains( + String.format("--conf spark.datasource.flint.auth=mybasicauth"))); + assertTrue( + params.contains( + String.format("--conf spark.datasource.flint.auth.username=username"))); + assertTrue( + params.contains( + String.format("--conf spark.datasource.flint.auth.password=password"))); + } + + @Test + public void withSessionCreateAsyncQueryFailed() { + LocalEMRSClient emrsClient = new LocalEMRSClient(); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // enable session + enableSession(true); + + // 1. create async query. + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("myselect 1", DATASOURCE, LangType.SQL, null)); + assertNotNull(response.getSessionId()); + Optional statementModel = + getStatement(stateStore, DATASOURCE).apply(response.getQueryId()); + assertTrue(statementModel.isPresent()); + assertEquals(StatementState.WAITING, statementModel.get().getStatementState()); + + // 2. fetch async query result. not result write to SPARK_RESPONSE_BUFFER_INDEX_NAME yet. + // mock failed statement. + StatementModel submitted = statementModel.get(); + StatementModel mocked = StatementModel.builder() + .version("1.0") + .statementState(submitted.getStatementState()) + .statementId(submitted.getStatementId()) + .sessionId(submitted.getSessionId()) + .applicationId(submitted.getApplicationId()) + .jobId(submitted.getJobId()) + .langType(submitted.getLangType()) + .datasourceName(submitted.getDatasourceName()) + .query(submitted.getQuery()) + .queryId(submitted.getQueryId()) + .submitTime(submitted.getSubmitTime()) + .error("mock error") + .seqNo(submitted.getSeqNo()) + .primaryTerm(submitted.getPrimaryTerm()) + .build(); + updateStatementState(stateStore, DATASOURCE).apply(mocked, StatementState.FAILED); + + + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals(StatementState.FAILED.getState(), asyncQueryResults.getStatus()); + assertEquals("mock error", asyncQueryResults.getError()); + } + private DataSourceServiceImpl createDataSourceService() { - String masterKey = "1234567890"; + String masterKey = "a57d991d9b573f75b9bba1df"; DataSourceMetadataStorage dataSourceMetadataStorage = new OpenSearchDataSourceMetadataStorage( client, clusterService, new EncryptorImpl(masterKey)); From d47c9631fe9e4303daab103d7ddd8895d0648857 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 23 Oct 2023 15:46:12 -0700 Subject: [PATCH 2/4] fix codestyle Signed-off-by: Peng Huo --- .../session/CreateSessionRequest.java | 29 +++++++---- ...AsyncQueryExecutorServiceImplSpecTest.java | 50 +++++++++---------- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java index 27ba1fd0c8..b2201fbd01 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java @@ -31,18 +31,27 @@ public StartJobRequest getStartJobRequest() { resultIndex); } - static class InteractiveSessionStartJobRequest extends StartJobRequest{ - public InteractiveSessionStartJobRequest(String query, String jobName, String applicationId, - String executionRoleArn, String sparkSubmitParams, - Map tags, - String resultIndex) { - super(query, jobName, applicationId, executionRoleArn, sparkSubmitParams, tags, - false, resultIndex); + static class InteractiveSessionStartJobRequest extends StartJobRequest { + public InteractiveSessionStartJobRequest( + String query, + String jobName, + String applicationId, + String executionRoleArn, + String sparkSubmitParams, + Map tags, + String resultIndex) { + super( + query, + jobName, + applicationId, + executionRoleArn, + sparkSubmitParams, + tags, + false, + resultIndex); } - /** - * Interactive query keep running. - */ + /** Interactive query keep running. */ @Override public Long executionTimeout() { return 0L; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index f00f1e25e1..d6b1ec631e 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -314,8 +314,10 @@ public void datasourceWithBasicAuth() { "http://ec2-18-237-133-156.us-west-2.compute.amazonaws" + ".com:9200", "glue.indexstore.opensearch.auth", "basicauth", - "glue.indexstore.opensearch.auth.username", "username", - "glue.indexstore.opensearch.auth.password","admin"), + "glue.indexstore.opensearch.auth.username", + "username", + "glue.indexstore.opensearch.auth.password", + "admin"), null)); LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -327,15 +329,11 @@ public void datasourceWithBasicAuth() { asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest("select 1", "mybasicauth", LangType.SQL, null)); String params = emrsClient.getJobRequest().getSparkSubmitParams(); + assertTrue(params.contains(String.format("--conf spark.datasource.flint.auth=mybasicauth"))); assertTrue( - params.contains( - String.format("--conf spark.datasource.flint.auth=mybasicauth"))); - assertTrue( - params.contains( - String.format("--conf spark.datasource.flint.auth.username=username"))); + params.contains(String.format("--conf spark.datasource.flint.auth.username=username"))); assertTrue( - params.contains( - String.format("--conf spark.datasource.flint.auth.password=password"))); + params.contains(String.format("--conf spark.datasource.flint.auth.password=password"))); } @Test @@ -360,25 +358,25 @@ public void withSessionCreateAsyncQueryFailed() { // 2. fetch async query result. not result write to SPARK_RESPONSE_BUFFER_INDEX_NAME yet. // mock failed statement. StatementModel submitted = statementModel.get(); - StatementModel mocked = StatementModel.builder() - .version("1.0") - .statementState(submitted.getStatementState()) - .statementId(submitted.getStatementId()) - .sessionId(submitted.getSessionId()) - .applicationId(submitted.getApplicationId()) - .jobId(submitted.getJobId()) - .langType(submitted.getLangType()) - .datasourceName(submitted.getDatasourceName()) - .query(submitted.getQuery()) - .queryId(submitted.getQueryId()) - .submitTime(submitted.getSubmitTime()) - .error("mock error") - .seqNo(submitted.getSeqNo()) - .primaryTerm(submitted.getPrimaryTerm()) - .build(); + StatementModel mocked = + StatementModel.builder() + .version("1.0") + .statementState(submitted.getStatementState()) + .statementId(submitted.getStatementId()) + .sessionId(submitted.getSessionId()) + .applicationId(submitted.getApplicationId()) + .jobId(submitted.getJobId()) + .langType(submitted.getLangType()) + .datasourceName(submitted.getDatasourceName()) + .query(submitted.getQuery()) + .queryId(submitted.getQueryId()) + .submitTime(submitted.getSubmitTime()) + .error("mock error") + .seqNo(submitted.getSeqNo()) + .primaryTerm(submitted.getPrimaryTerm()) + .build(); updateStatementState(stateStore, DATASOURCE).apply(mocked, StatementState.FAILED); - AsyncQueryExecutionResponse asyncQueryResults = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); assertEquals(StatementState.FAILED.getState(), asyncQueryResults.getStatus()); From c61027bfeadcfe0189ddf81ab04f03a51bbbd8ad Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 23 Oct 2023 16:25:19 -0700 Subject: [PATCH 3/4] fix IT failure: datasourceWithBasicAuth Signed-off-by: Peng Huo --- ...AsyncQueryExecutorServiceImplSpecTest.java | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index d6b1ec631e..19edd53eae 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -27,7 +27,9 @@ import com.google.common.collect.ImmutableSet; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import lombok.Getter; import org.junit.After; @@ -110,7 +112,7 @@ public void setup() { "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", "glue.indexstore.opensearch.uri", - "http://ec2-18-237-133-156.us-west-2.compute.amazonaws" + ".com:9200", + "http://localhost:9200", "glue.indexstore.opensearch.auth", "noauth"), null)); @@ -300,25 +302,18 @@ public void interactiveQueryNoTimeout() { @Test public void datasourceWithBasicAuth() { + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put( + "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "basicauth"); + properties.put("glue.indexstore.opensearch.auth.username", "username"); + properties.put("glue.indexstore.opensearch.auth.password", "password"); + dataSourceService.createDataSource( new DataSourceMetadata( - "mybasicauth", - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://ec2-18-237-133-156.us-west-2.compute.amazonaws" + ".com:9200", - "glue.indexstore.opensearch.auth", - "basicauth", - "glue.indexstore.opensearch.auth.username", - "username", - "glue.indexstore.opensearch.auth.password", - "admin"), - null)); + "mybasicauth", DataSourceType.S3GLUE, ImmutableList.of(), properties, null)); LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = createAsyncQueryExecutorService(emrsClient); @@ -329,7 +324,7 @@ public void datasourceWithBasicAuth() { asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest("select 1", "mybasicauth", LangType.SQL, null)); String params = emrsClient.getJobRequest().getSparkSubmitParams(); - assertTrue(params.contains(String.format("--conf spark.datasource.flint.auth=mybasicauth"))); + assertTrue(params.contains(String.format("--conf spark.datasource.flint.auth=basic"))); assertTrue( params.contains(String.format("--conf spark.datasource.flint.auth.username=username"))); assertTrue( From 7f7390e5e053c2b113a745868dad8eeba9b7cbae Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Mon, 23 Oct 2023 17:20:18 -0700 Subject: [PATCH 4/4] fix UT Signed-off-by: Peng Huo --- .../sql/spark/dispatcher/SparkQueryDispatcherTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 4acccae0e2..04c81f9919 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -98,7 +98,8 @@ public class SparkQueryDispatcherTest { @Mock(answer = RETURNS_DEEP_STUBS) private Session session; - @Mock private Statement statement; + @Mock(answer = RETURNS_DEEP_STUBS) + private Statement statement; private SparkQueryDispatcher sparkQueryDispatcher; @@ -181,7 +182,7 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( - "basicauth", + "basic", new HashMap<>() { { put(FLINT_INDEX_STORE_AUTH_USERNAME, "username"); @@ -723,6 +724,7 @@ void testGetQueryResponse() { void testGetQueryResponseWithSession() { doReturn(Optional.of(session)).when(sessionManager).getSession(new SessionId(MOCK_SESSION_ID)); doReturn(Optional.of(statement)).when(session).get(any()); + when(statement.getStatementModel().getError()).thenReturn("mock error"); doReturn(StatementState.WAITING).when(statement).getStatementState(); doReturn(new JSONObject())