diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index b2bb590c1e..9bfead67b6 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -103,7 +103,7 @@ private AsyncQueryId storeIndexDMLResult( dispatchQueryRequest.getDatasource(), queryRunTime, System.currentTimeMillis()); - indexDMLResultStorageService.createIndexDMLResult(indexDMLResult, dataSourceMetadata.getName()); + indexDMLResultStorageService.createIndexDMLResult(indexDMLResult); return asyncQueryId; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index f08ef4f489..760c898825 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -64,7 +64,7 @@ public void open(CreateSessionRequest createSessionRequest) { sessionModel = initInteractiveSession( applicationId, jobID, sessionId, createSessionRequest.getDatasourceName()); - sessionStorageService.createSession(sessionModel, sessionModel.getDatasourceName()); + sessionStorageService.createSession(sessionModel); } catch (VersionConflictEngineException e) { String errorMsg = "session already exist. " + sessionId; LOG.error(errorMsg); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java index cab045726c..b0205aec64 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java @@ -49,7 +49,7 @@ public void open() { datasourceName, query, queryId); - statementModel = statementStorageService.createStatement(statementModel, datasourceName); + statementModel = statementStorageService.createStatement(statementModel); } catch (VersionConflictEngineException e) { String errorMsg = "statement already exist. " + statementId; LOG.error(errorMsg); @@ -73,8 +73,7 @@ public void cancel() { } try { this.statementModel = - statementStorageService.updateStatementState( - statementModel, StatementState.CANCELLED, statementModel.getDatasourceName()); + statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED); } catch (DocumentMissingException e) { String errorMsg = String.format("cancel statement failed. no statement found. statement: %s.", statementId); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java index a229d4f6bf..a43a878713 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java @@ -18,9 +18,11 @@ public class OpenSearchSessionStorageService implements SessionStorageService { private final SessionModelXContentSerializer serializer; @Override - public SessionModel createSession(SessionModel sessionModel, String datasourceName) { + public SessionModel createSession(SessionModel sessionModel) { return stateStore.create( - sessionModel, SessionModel::of, OpenSearchStateStoreUtil.getIndexName(datasourceName)); + sessionModel, + SessionModel::of, + OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName())); } @Override @@ -30,12 +32,11 @@ public Optional getSession(String id, String datasourceName) { } @Override - public SessionModel updateSessionState( - SessionModel sessionModel, SessionState sessionState, String datasourceName) { + public SessionModel updateSessionState(SessionModel sessionModel, SessionState sessionState) { return stateStore.updateState( sessionModel, sessionState, SessionModel::copyWithState, - OpenSearchStateStoreUtil.getIndexName(datasourceName)); + OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName())); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java index 226fb8d32a..5d3d2dc4d0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java @@ -18,11 +18,11 @@ public class OpenSearchStatementStorageService implements StatementStorageServic private final StatementModelXContentSerializer serializer; @Override - public StatementModel createStatement(StatementModel statementModel, String datasourceName) { + public StatementModel createStatement(StatementModel statementModel) { return stateStore.create( statementModel, StatementModel::copy, - OpenSearchStateStoreUtil.getIndexName(datasourceName)); + OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName())); } @Override @@ -33,11 +33,11 @@ public Optional getStatement(String id, String datasourceName) { @Override public StatementModel updateStatementState( - StatementModel oldStatementModel, StatementState statementState, String datasourceName) { + StatementModel oldStatementModel, StatementState statementState) { return stateStore.updateState( oldStatementModel, statementState, StatementModel::copyWithState, - OpenSearchStateStoreUtil.getIndexName(datasourceName)); + OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName())); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStorageService.java index 43472b567c..f67612b115 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStorageService.java @@ -12,10 +12,9 @@ /** Interface for accessing {@link SessionModel} data storage. */ public interface SessionStorageService { - SessionModel createSession(SessionModel sessionModel, String datasourceName); + SessionModel createSession(SessionModel sessionModel); Optional getSession(String id, String datasourceName); - SessionModel updateSessionState( - SessionModel sessionModel, SessionState sessionState, String datasourceName); + SessionModel updateSessionState(SessionModel sessionModel, SessionState sessionState); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StatementStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StatementStorageService.java index 0f550eba7c..9253a4850d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StatementStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StatementStorageService.java @@ -15,10 +15,10 @@ */ public interface StatementStorageService { - StatementModel createStatement(StatementModel statementModel, String datasourceName); + StatementModel createStatement(StatementModel statementModel); StatementModel updateStatementState( - StatementModel oldStatementModel, StatementState statementState, String datasourceName); + StatementModel oldStatementModel, StatementState statementState); Optional getStatement(String id, String datasourceName); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java index a00056fd53..94647f4e07 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java @@ -12,8 +12,7 @@ * flint index. */ public interface FlintIndexStateModelService { - FlintIndexStateModel createFlintIndexStateModel( - FlintIndexStateModel flintIndexStateModel, String datasourceName); + FlintIndexStateModel createFlintIndexStateModel(FlintIndexStateModel flintIndexStateModel); Optional getFlintIndexStateModel(String id, String datasourceName); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java index 31d4be511e..c816572d02 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java @@ -11,5 +11,5 @@ * Abstraction over the IndexDMLResult storage. It stores the result of IndexDML query execution. */ public interface IndexDMLResultStorageService { - IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName); + IndexDMLResult createIndexDMLResult(IndexDMLResult result); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java index 58dc5166db..2650ff3cb3 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java @@ -36,11 +36,11 @@ public Optional getFlintIndexStateModel(String id, String @Override public FlintIndexStateModel createFlintIndexStateModel( - FlintIndexStateModel flintIndexStateModel, String datasourceName) { + FlintIndexStateModel flintIndexStateModel) { return stateStore.create( flintIndexStateModel, FlintIndexStateModel::copy, - OpenSearchStateStoreUtil.getIndexName(datasourceName)); + OpenSearchStateStoreUtil.getIndexName(flintIndexStateModel.getDatasourceName())); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java index eeb2921449..314368771f 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java @@ -18,8 +18,9 @@ public class OpenSearchIndexDMLResultStorageService implements IndexDMLResultSto private final StateStore stateStore; @Override - public IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName) { - DataSourceMetadata dataSourceMetadata = dataSourceService.getDataSourceMetadata(datasourceName); + public IndexDMLResult createIndexDMLResult(IndexDMLResult result) { + DataSourceMetadata dataSourceMetadata = + dataSourceService.getDataSourceMetadata(result.getDatasourceName()); return stateStore.create(result, IndexDMLResult::copy, dataSourceMetadata.getResultIndex()); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 7f9fc5545d..f3c17914d2 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -317,7 +317,7 @@ public void withSessionCreateAsyncQueryFailed() { .seqNo(submitted.getSeqNo()) .primaryTerm(submitted.getPrimaryTerm()) .build(); - statementStorageService.updateStatementState(mocked, StatementState.FAILED, MYS3_DATASOURCE); + statementStorageService.updateStatementState(mocked, StatementState.FAILED); AsyncQueryExecutionResponse asyncQueryResults = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 8ac5b92cd8..ba75da5dda 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -373,8 +373,7 @@ int search(QueryBuilder query) { void setSessionState(String sessionId, SessionState sessionState) { Optional model = sessionStorageService.getSession(sessionId, MYS3_DATASOURCE); - SessionModel updated = - sessionStorageService.updateSessionState(model.get(), sessionState, MYS3_DATASOURCE); + SessionModel updated = sessionStorageService.updateSessionState(model.get(), sessionState); assertEquals(sessionState, updated.getSessionState()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index bcce6e27c2..f2c3bda026 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -510,7 +510,7 @@ void emrJobWriteResultDoc(Map resultDoc) { /** Simulate EMR-S updates query_execution_request with state */ void emrJobUpdateStatementState(StatementState newState) { StatementModel stmt = statementStorageService.getStatement(queryId, MYS3_DATASOURCE).get(); - statementStorageService.updateStatementState(stmt, newState, MYS3_DATASOURCE); + statementStorageService.updateStatementState(stmt, newState); } void emrJobUpdateJobState(JobRunState jobState) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java index 4c58ea472f..87cc765071 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java @@ -36,7 +36,7 @@ public MockFlintSparkJob( "", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - stateModel = flintIndexStateModelService.createFlintIndexStateModel(stateModel, datasource); + stateModel = flintIndexStateModelService.createFlintIndexStateModel(stateModel); } public void transition(FlintIndexState newState) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java index b18ec05497..010c8b7c6a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java @@ -137,8 +137,7 @@ public void cancelFailedBecauseOfConflict() { st.open(); StatementModel running = - statementStorageService.updateStatementState( - st.getStatementModel(), CANCELLED, TEST_DATASOURCE_NAME); + statementStorageService.updateStatementState(st.getStatementModel(), CANCELLED); assertEquals(StatementState.CANCELLED, running.getStatementState()); @@ -232,8 +231,7 @@ public void submitStatementInRunningSession() { Session session = sessionManager.createSession(createSessionRequest()); // App change state to running - sessionStorageService.updateSessionState( - session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING); StatementId statementId = session.submit(queryRequest()); assertFalse(statementId.getId().isEmpty()); @@ -251,8 +249,7 @@ public void submitStatementInNotStartedState() { public void failToSubmitStatementInDeadState() { Session session = sessionManager.createSession(createSessionRequest()); - sessionStorageService.updateSessionState( - session.getSessionModel(), SessionState.DEAD, TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.DEAD); IllegalStateException exception = assertThrows(IllegalStateException.class, () -> session.submit(queryRequest())); @@ -266,8 +263,7 @@ public void failToSubmitStatementInDeadState() { public void failToSubmitStatementInFailState() { Session session = sessionManager.createSession(createSessionRequest()); - sessionStorageService.updateSessionState( - session.getSessionModel(), SessionState.FAIL, TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.FAIL); IllegalStateException exception = assertThrows(IllegalStateException.class, () -> session.submit(queryRequest())); @@ -312,8 +308,7 @@ public void failToSubmitStatementInDeletedSession() { public void getStatementSuccess() { Session session = sessionManager.createSession(createSessionRequest()); // App change state to running - sessionStorageService.updateSessionState( - session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING); StatementId statementId = session.submit(queryRequest()); Optional statement = session.get(statementId); @@ -326,8 +321,7 @@ public void getStatementSuccess() { public void getStatementNotExist() { Session session = sessionManager.createSession(createSessionRequest()); // App change state to running - sessionStorageService.updateSessionState( - session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME); + sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING); Optional statement = session.get(StatementId.newStatementId("not-exist-id")); assertFalse(statement.isPresent()); @@ -376,8 +370,7 @@ public TestStatement cancel() { public TestStatement run() { StatementModel model = - statementStorageService.updateStatementState( - st.getStatementModel(), RUNNING, TEST_DATASOURCE_NAME); + statementStorageService.updateStatementState(st.getStatementModel(), RUNNING); st.setStatementModel(model); return this; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java index 5ec5a96073..c9ee5e5ce8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java @@ -59,10 +59,10 @@ void getFlintIndexStateModel() { @Test void createFlintIndexStateModel() { when(mockStateStore.create(any(), any(), any())).thenReturn(responseFlintIndexStateModel); + when(flintIndexStateModel.getDatasourceName()).thenReturn(DATASOURCE); FlintIndexStateModel result = - openSearchFlintIndexStateModelService.createFlintIndexStateModel( - flintIndexStateModel, DATASOURCE); + openSearchFlintIndexStateModelService.createFlintIndexStateModel(flintIndexStateModel); assertEquals(responseFlintIndexStateModel, result); }