Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Remove unneeded datasourceName parameters #2686

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private AsyncQueryId storeIndexDMLResult(
dispatchQueryRequest.getDatasource(),
queryRunTime,
System.currentTimeMillis());
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult, dataSourceMetadata.getName());
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult);
return asyncQueryId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void open(CreateSessionRequest createSessionRequest) {
sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, sessionModel.getDatasourceName());
sessionStorageService.createSession(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void open() {
datasourceName,
query,
queryId);
statementModel = statementStorageService.createStatement(statementModel, datasourceName);
statementModel = statementStorageService.createStatement(statementModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
Expand All @@ -73,8 +73,7 @@ public void cancel() {
}
try {
this.statementModel =
statementStorageService.updateStatementState(
statementModel, StatementState.CANCELLED, statementModel.getDatasourceName());
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ public class OpenSearchSessionStorageService implements SessionStorageService {
private final SessionModelXContentSerializer serializer;

@Override
public SessionModel createSession(SessionModel sessionModel, String datasourceName) {
public SessionModel createSession(SessionModel sessionModel) {
return stateStore.create(
sessionModel, SessionModel::of, OpenSearchStateStoreUtil.getIndexName(datasourceName));
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
}

@Override
Expand All @@ -30,12 +32,11 @@ public Optional<SessionModel> getSession(String id, String datasourceName) {
}

@Override
public SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName) {
public SessionModel updateSessionState(SessionModel sessionModel, SessionState sessionState) {
return stateStore.updateState(
sessionModel,
sessionState,
SessionModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public class OpenSearchStatementStorageService implements StatementStorageServic
private final StatementModelXContentSerializer serializer;

@Override
public StatementModel createStatement(StatementModel statementModel, String datasourceName) {
public StatementModel createStatement(StatementModel statementModel) {
return stateStore.create(
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
}

@Override
Expand All @@ -33,11 +33,11 @@ public Optional<StatementModel> getStatement(String id, String datasourceName) {

@Override
public StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState, String datasourceName) {
StatementModel oldStatementModel, StatementState statementState) {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
/** Interface for accessing {@link SessionModel} data storage. */
public interface SessionStorageService {

SessionModel createSession(SessionModel sessionModel, String datasourceName);
SessionModel createSession(SessionModel sessionModel);

Optional<SessionModel> getSession(String id, String datasourceName);

SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName);
SessionModel updateSessionState(SessionModel sessionModel, SessionState sessionState);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
public interface StatementStorageService {

StatementModel createStatement(StatementModel statementModel, String datasourceName);
StatementModel createStatement(StatementModel statementModel);

StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState, String datasourceName);
StatementModel oldStatementModel, StatementState statementState);

Optional<StatementModel> getStatement(String id, String datasourceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
* flint index.
*/
public interface FlintIndexStateModelService {
FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName);
FlintIndexStateModel createFlintIndexStateModel(FlintIndexStateModel flintIndexStateModel);

Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String datasourceName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
* Abstraction over the IndexDMLResult storage. It stores the result of IndexDML query execution.
*/
public interface IndexDMLResultStorageService {
IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName);
IndexDMLResult createIndexDMLResult(IndexDMLResult result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String

@Override
public FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName) {
FlintIndexStateModel flintIndexStateModel) {
return stateStore.create(
flintIndexStateModel,
FlintIndexStateModel::copy,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(flintIndexStateModel.getDatasourceName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ public class OpenSearchIndexDMLResultStorageService implements IndexDMLResultSto
private final StateStore stateStore;

@Override
public IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName) {
DataSourceMetadata dataSourceMetadata = dataSourceService.getDataSourceMetadata(datasourceName);
public IndexDMLResult createIndexDMLResult(IndexDMLResult result) {
DataSourceMetadata dataSourceMetadata =
dataSourceService.getDataSourceMetadata(result.getDatasourceName());
return stateStore.create(result, IndexDMLResult::copy, dataSourceMetadata.getResultIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void withSessionCreateAsyncQueryFailed() {
.seqNo(submitted.getSeqNo())
.primaryTerm(submitted.getPrimaryTerm())
.build();
statementStorageService.updateStatementState(mocked, StatementState.FAILED, MYS3_DATASOURCE);
statementStorageService.updateStatementState(mocked, StatementState.FAILED);

AsyncQueryExecutionResponse asyncQueryResults =
asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ int search(QueryBuilder query) {

void setSessionState(String sessionId, SessionState sessionState) {
Optional<SessionModel> model = sessionStorageService.getSession(sessionId, MYS3_DATASOURCE);
SessionModel updated =
sessionStorageService.updateSessionState(model.get(), sessionState, MYS3_DATASOURCE);
SessionModel updated = sessionStorageService.updateSessionState(model.get(), sessionState);
assertEquals(sessionState, updated.getSessionState());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ void emrJobWriteResultDoc(Map<String, Object> resultDoc) {
/** Simulate EMR-S updates query_execution_request with state */
void emrJobUpdateStatementState(StatementState newState) {
StatementModel stmt = statementStorageService.getStatement(queryId, MYS3_DATASOURCE).get();
statementStorageService.updateStatementState(stmt, newState, MYS3_DATASOURCE);
statementStorageService.updateStatementState(stmt, newState);
}

void emrJobUpdateJobState(JobRunState jobState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public MockFlintSparkJob(
"",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
stateModel = flintIndexStateModelService.createFlintIndexStateModel(stateModel, datasource);
stateModel = flintIndexStateModelService.createFlintIndexStateModel(stateModel);
}

public void transition(FlintIndexState newState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ public void cancelFailedBecauseOfConflict() {
st.open();

StatementModel running =
statementStorageService.updateStatementState(
st.getStatementModel(), CANCELLED, TEST_DATASOURCE_NAME);
statementStorageService.updateStatementState(st.getStatementModel(), CANCELLED);

assertEquals(StatementState.CANCELLED, running.getStatementState());

Expand Down Expand Up @@ -232,8 +231,7 @@ public void submitStatementInRunningSession() {
Session session = sessionManager.createSession(createSessionRequest());

// App change state to running
sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING);

StatementId statementId = session.submit(queryRequest());
assertFalse(statementId.getId().isEmpty());
Expand All @@ -251,8 +249,7 @@ public void submitStatementInNotStartedState() {
public void failToSubmitStatementInDeadState() {
Session session = sessionManager.createSession(createSessionRequest());

sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.DEAD, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.DEAD);

IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> session.submit(queryRequest()));
Expand All @@ -266,8 +263,7 @@ public void failToSubmitStatementInDeadState() {
public void failToSubmitStatementInFailState() {
Session session = sessionManager.createSession(createSessionRequest());

sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.FAIL, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.FAIL);

IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> session.submit(queryRequest()));
Expand Down Expand Up @@ -312,8 +308,7 @@ public void failToSubmitStatementInDeletedSession() {
public void getStatementSuccess() {
Session session = sessionManager.createSession(createSessionRequest());
// App change state to running
sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING);
StatementId statementId = session.submit(queryRequest());

Optional<Statement> statement = session.get(statementId);
Expand All @@ -326,8 +321,7 @@ public void getStatementSuccess() {
public void getStatementNotExist() {
Session session = sessionManager.createSession(createSessionRequest());
// App change state to running
sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING);

Optional<Statement> statement = session.get(StatementId.newStatementId("not-exist-id"));
assertFalse(statement.isPresent());
Expand Down Expand Up @@ -376,8 +370,7 @@ public TestStatement cancel() {

public TestStatement run() {
StatementModel model =
statementStorageService.updateStatementState(
st.getStatementModel(), RUNNING, TEST_DATASOURCE_NAME);
statementStorageService.updateStatementState(st.getStatementModel(), RUNNING);
st.setStatementModel(model);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ void getFlintIndexStateModel() {
@Test
void createFlintIndexStateModel() {
when(mockStateStore.create(any(), any(), any())).thenReturn(responseFlintIndexStateModel);
when(flintIndexStateModel.getDatasourceName()).thenReturn(DATASOURCE);

FlintIndexStateModel result =
openSearchFlintIndexStateModelService.createFlintIndexStateModel(
flintIndexStateModel, DATASOURCE);
openSearchFlintIndexStateModelService.createFlintIndexStateModel(flintIndexStateModel);

assertEquals(responseFlintIndexStateModel, result);
}
Expand Down
Loading