From 8e7fc7ce93fe76e13dd909b15b6c33a2b18cf608 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Thu, 9 May 2024 16:41:36 -0700 Subject: [PATCH] Reformat Signed-off-by: Tomoyuki Morita --- .../execution/session/InteractiveSession.java | 3 +- .../execution/session/SessionManager.java | 3 +- .../spark/execution/statement/Statement.java | 7 +- .../config/AsyncExecutorServiceModule.java | 11 ++-- .../AsyncQueryExecutorServiceSpec.java | 15 ++++- .../session/InteractiveSessionTest.java | 20 ++++-- .../execution/session/SessionManagerTest.java | 8 ++- .../execution/statement/StatementTest.java | 66 ++++++++++--------- 8 files changed, 77 insertions(+), 56 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index 680a0c8c61..f08ef4f489 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -124,7 +124,8 @@ public StatementId submit(QueryRequest request) { @Override public Optional get(StatementId stID) { - return statementStorageService.getStatement(stID.getId(), sessionModel.getDatasourceName()) + return statementStorageService + .getStatement(stID.getId(), sessionModel.getDatasourceName()) .map( model -> Statement.builder() diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index a48ef523cc..f8d429dd38 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -59,7 +59,8 @@ public Session createSession(CreateSessionRequest request) { * empty Optional if no matching session is found. */ public Optional getSession(SessionId sid, String dataSourceName) { - Optional model = sessionStorageService.getSession(sid.getSessionId(), dataSourceName); + Optional model = + sessionStorageService.getSession(sid.getSessionId(), dataSourceName); if (model.isPresent()) { InteractiveSession session = InteractiveSession.builder() diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java index 3a38606090..cab045726c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java @@ -73,8 +73,8 @@ public void cancel() { } try { this.statementModel = - statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED, - statementModel.getDatasourceName()); + statementStorageService.updateStatementState( + statementModel, StatementState.CANCELLED, statementModel.getDatasourceName()); } catch (DocumentMissingException e) { String errorMsg = String.format("cancel statement failed. no statement found. statement: %s.", statementId); @@ -82,7 +82,8 @@ public void cancel() { throw new IllegalStateException(errorMsg); } catch (VersionConflictEngineException e) { this.statementModel = - statementStorageService.getStatement(statementModel.getId(), statementModel.getDatasourceName()) + statementStorageService + .getStatement(statementModel.getId(), statementModel.getDatasourceName()) .orElse(this.statementModel); String errorMsg = String.format( diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index e8ba3d6180..6a33e6d5b6 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -127,20 +127,17 @@ public SessionManager sessionManager( StatementStorageService statementStorageService, EMRServerlessClientFactory emrServerlessClientFactory, Settings settings) { - return new SessionManager(sessionStorageService, statementStorageService, emrServerlessClientFactory, settings); + return new SessionManager( + sessionStorageService, statementStorageService, emrServerlessClientFactory, settings); } @Provides - public SessionStorageService sessionStorageService( - StateStore stateStore - ) { + public SessionStorageService sessionStorageService(StateStore stateStore) { return new OpenSearchSessionStorageService(stateStore); } @Provides - public StatementStorageService statementStorageService( - StateStore stateStore - ) { + public StatementStorageService statementStorageService(StateStore stateStore) { return new OpenSearchStatementStorageService(stateStore); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 7c418afdbc..a8ae5fcb1a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -228,7 +228,11 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( new QueryHandlerFactory( jobExecutionResponseReader, new FlintIndexMetadataServiceImpl(client), - new SessionManager(sessionStorageService, statementStorageService, emrServerlessClientFactory, pluginSettings), + new SessionManager( + sessionStorageService, + statementStorageService, + emrServerlessClientFactory, + pluginSettings), new DefaultLeaseManager(pluginSettings, stateStore), new OpenSearchIndexDMLResultStorageService(dataSourceService, stateStore), new FlintIndexOpFactory( @@ -240,7 +244,11 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( this.dataSourceService, - new SessionManager(sessionStorageService, statementStorageService, emrServerlessClientFactory, pluginSettings), + new SessionManager( + sessionStorageService, + statementStorageService, + emrServerlessClientFactory, + pluginSettings), queryHandlerFactory); return new AsyncQueryExecutorServiceImpl( asyncQueryJobMetadataStorageService, @@ -358,7 +366,8 @@ int search(QueryBuilder query) { void setSessionState(String sessionId, SessionState sessionState) { Optional model = sessionStorageService.getSession(sessionId, MYS3_DATASOURCE); - SessionModel updated = sessionStorageService.updateSessionState(model.get(), sessionState, MYS3_DATASOURCE); + SessionModel updated = + sessionStorageService.updateSessionState(model.get(), sessionState, MYS3_DATASOURCE); assertEquals(sessionState, updated.getSessionState()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index ba2cbedc13..8aac451f82 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -22,8 +22,8 @@ import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.JobType; -import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; import org.opensearch.sql.spark.execution.statestore.OpenSearchSessionStorageService; +import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; import org.opensearch.sql.spark.execution.statestore.OpenSearchStatementStorageService; import org.opensearch.sql.spark.execution.statestore.SessionStorageService; import org.opensearch.sql.spark.execution.statestore.StateStore; @@ -33,7 +33,8 @@ /** mock-maker-inline does not work with OpenSearchTestCase. */ public class InteractiveSessionTest extends OpenSearchIntegTestCase { - private static final String indexName = OpenSearchStateStoreUtil.getIndexName(TEST_DATASOURCE_NAME); + private static final String indexName = + OpenSearchStateStoreUtil.getIndexName(TEST_DATASOURCE_NAME); private TestEMRServerlessClient emrsClient; private StartJobRequest startJobRequest; @@ -49,7 +50,12 @@ public void setup() { sessionStorageService = new OpenSearchSessionStorageService(stateStore); statementStorageService = new OpenSearchStatementStorageService(stateStore); EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - sessionManager = new SessionManager(sessionStorageService, statementStorageService, emrServerlessClientFactory, sessionSetting()); + sessionManager = + new SessionManager( + sessionStorageService, + statementStorageService, + emrServerlessClientFactory, + sessionSetting()); } @After @@ -71,7 +77,8 @@ public void openCloseSession() { .build(); SessionAssertions assertions = new SessionAssertions(session); - assertions.open(createSessionRequest()) + assertions + .open(createSessionRequest()) .assertSessionState(NOT_STARTED) .assertAppId("appId") .assertJobId("jobId"); @@ -132,7 +139,8 @@ public void closeNotExistSession() { public void sessionManagerCreateSession() { Session session = sessionManager.createSession(createSessionRequest()); - new SessionAssertions(session).assertSessionState(NOT_STARTED) + new SessionAssertions(session) + .assertSessionState(NOT_STARTED) .assertAppId("appId") .assertJobId("jobId"); } @@ -188,6 +196,4 @@ public SessionAssertions close() { return this; } } - - } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index 9669c3f468..360018c5b0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -16,7 +16,6 @@ import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statestore.SessionStorageService; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.execution.statestore.StatementStorageService; @ExtendWith(MockitoExtension.class) @@ -27,7 +26,12 @@ public class SessionManagerTest { @Test public void sessionEnable() { - SessionManager sessionManager = new SessionManager(sessionStorageService, statementStorageService, emrServerlessClientFactory, sessionSetting()); + SessionManager sessionManager = + new SessionManager( + sessionStorageService, + statementStorageService, + emrServerlessClientFactory, + sessionSetting()); Assertions.assertTrue(sessionManager.isEnabled()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java index 27cfd56ac5..5f05eed9b9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java @@ -52,7 +52,12 @@ public void setup() { sessionStorageService = new OpenSearchSessionStorageService(stateStore); EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; - sessionManager = new SessionManager(sessionStorageService, statementStorageService, emrServerlessClientFactory, sessionSetting()); + sessionManager = + new SessionManager( + sessionStorageService, + statementStorageService, + emrServerlessClientFactory, + sessionSetting()); } @After @@ -123,12 +128,12 @@ public void cancelNotExistStatement() { @Test public void cancelFailedBecauseOfConflict() { StatementId stId = new StatementId("statementId"); - Statement st = - buildStatement(stId); + Statement st = buildStatement(stId); st.open(); - StatementModel running = statementStorageService.updateStatementState(st.getStatementModel(), CANCELLED, - TEST_DATASOURCE_NAME); + StatementModel running = + statementStorageService.updateStatementState( + st.getStatementModel(), CANCELLED, TEST_DATASOURCE_NAME); assertEquals(StatementState.CANCELLED, running.getStatementState()); @@ -202,8 +207,7 @@ public void cancelCancelledStatementFailed() { @Test public void cancelRunningStatementSuccess() { - Statement st = - buildStatement(); + Statement st = buildStatement(); // submit statement TestStatement testStatement = testStatement(st, statementStorageService); @@ -223,8 +227,8 @@ public void submitStatementInRunningSession() { Session session = sessionManager.createSession(createSessionRequest()); // App change state to running - sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING, - TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState( + session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME); StatementId statementId = session.submit(queryRequest()); assertFalse(statementId.getId().isEmpty()); @@ -242,8 +246,8 @@ public void submitStatementInNotStartedState() { public void failToSubmitStatementInDeadState() { Session session = sessionManager.createSession(createSessionRequest()); - sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.DEAD, - TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState( + session.getSessionModel(), SessionState.DEAD, TEST_DATASOURCE_NAME); IllegalStateException exception = assertThrows(IllegalStateException.class, () -> session.submit(queryRequest())); @@ -257,8 +261,8 @@ public void failToSubmitStatementInDeadState() { public void failToSubmitStatementInFailState() { Session session = sessionManager.createSession(createSessionRequest()); - sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.FAIL, - TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState( + session.getSessionModel(), SessionState.FAIL, TEST_DATASOURCE_NAME); IllegalStateException exception = assertThrows(IllegalStateException.class, () -> session.submit(queryRequest())); @@ -270,9 +274,7 @@ public void failToSubmitStatementInFailState() { @Test public void newStatementFieldAssert() { - Session session = - sessionManager - .createSession(createSessionRequest()); + Session session = sessionManager.createSession(createSessionRequest()); StatementId statementId = session.submit(queryRequest()); Optional statement = session.get(statementId); @@ -305,8 +307,8 @@ public void failToSubmitStatementInDeletedSession() { public void getStatementSuccess() { Session session = sessionManager.createSession(createSessionRequest()); // App change state to running - sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING, - TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState( + session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME); StatementId statementId = session.submit(queryRequest()); Optional statement = session.get(statementId); @@ -317,11 +319,10 @@ public void getStatementSuccess() { @Test public void getStatementNotExist() { - Session session = sessionManager - .createSession(createSessionRequest()); + Session session = sessionManager.createSession(createSessionRequest()); // App change state to running - sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING, - TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState( + session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME); Optional statement = session.get(StatementId.newStatementId("not-exist-id")); assertFalse(statement.isPresent()); @@ -332,16 +333,16 @@ static class TestStatement { private final Statement st; private final StatementStorageService statementStorageService; - public static TestStatement testStatement(Statement st, StatementStorageService statementStorageService) { + public static TestStatement testStatement( + Statement st, StatementStorageService statementStorageService) { return new TestStatement(st, statementStorageService); } public TestStatement assertSessionState(StatementState expected) { assertEquals(expected, st.getStatementModel().getStatementState()); - Optional model = statementStorageService.getStatement( - st.getStatementId().getId(), TEST_DATASOURCE_NAME - ); + Optional model = + statementStorageService.getStatement(st.getStatementId().getId(), TEST_DATASOURCE_NAME); assertTrue(model.isPresent()); assertEquals(expected, model.get().getStatementState()); @@ -351,9 +352,8 @@ public TestStatement assertSessionState(StatementState expected) { public TestStatement assertStatementId(StatementId expected) { assertEquals(expected, st.getStatementModel().getStatementId()); - Optional model = statementStorageService.getStatement( - st.getStatementId().getId(), TEST_DATASOURCE_NAME - ); + Optional model = + statementStorageService.getStatement(st.getStatementId().getId(), TEST_DATASOURCE_NAME); assertTrue(model.isPresent()); assertEquals(expected, model.get().getStatementId()); return this; @@ -370,15 +370,17 @@ public TestStatement cancel() { } public TestStatement run() { - StatementModel model = statementStorageService.updateStatementState(st.getStatementModel(), RUNNING, - TEST_DATASOURCE_NAME); + StatementModel model = + statementStorageService.updateStatementState( + st.getStatementModel(), RUNNING, TEST_DATASOURCE_NAME); st.setStatementModel(model); return this; } } private QueryRequest queryRequest() { - return new QueryRequest(AsyncQueryId.newAsyncQueryId(TEST_DATASOURCE_NAME), LangType.SQL, "select 1"); + return new QueryRequest( + AsyncQueryId.newAsyncQueryId(TEST_DATASOURCE_NAME), LangType.SQL, "select 1"); } private Statement createStatement(StatementId stId) {