Skip to content

Commit

Permalink
Extract SessionStorageService and StatementStorageService (#2665)
Browse files Browse the repository at this point in the history
* Extract SessionStorageService and StatementStorageService

Signed-off-by: Tomoyuki Morita <[email protected]>

* Reformat

Signed-off-by: Tomoyuki Morita <[email protected]>

* Add copyright comment

Signed-off-by: Tomoyuki Morita <[email protected]>

* Add comments and remove unused methods

Signed-off-by: Tomoyuki Morita <[email protected]>

* Remove unneeded imports

Signed-off-by: Tomoyuki Morita <[email protected]>

* Fix code format issue

Signed-off-by: Tomoyuki Morita <[email protected]>

---------

Signed-off-by: Tomoyuki Morita <[email protected]>
(cherry picked from commit 1985459)
  • Loading branch information
ykmr1224 committed May 15, 2024
1 parent ddcf96f commit b48d36f
Show file tree
Hide file tree
Showing 17 changed files with 423 additions and 383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import static org.opensearch.sql.spark.execution.session.SessionState.END_STATE;
import static org.opensearch.sql.spark.execution.session.SessionState.FAIL;
import static org.opensearch.sql.spark.execution.statement.StatementId.newStatementId;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createSession;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession;

import java.util.Optional;
import lombok.Builder;
Expand All @@ -24,7 +22,8 @@
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.statestore.SessionStorageService;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.TimeProvider;

Expand All @@ -41,7 +40,8 @@ public class InteractiveSession implements Session {
public static final String SESSION_ID_TAG_KEY = "sid";

private final SessionId sessionId;
private final StateStore stateStore;
private final SessionStorageService sessionStorageService;
private final StatementStorageService statementStorageService;
private final EMRServerlessClient serverlessClient;
private SessionModel sessionModel;
// the threshold of elapsed time in milliseconds before we say a session is stale
Expand All @@ -64,7 +64,7 @@ public void open(CreateSessionRequest createSessionRequest) {
sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
createSession(stateStore, sessionModel.getDatasourceName()).apply(sessionModel);
sessionStorageService.createSession(sessionModel, sessionModel.getDatasourceName());
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
Expand All @@ -76,7 +76,7 @@ public void open(CreateSessionRequest createSessionRequest) {
@Override
public void close() {
Optional<SessionModel> model =
getSession(stateStore, sessionModel.getDatasourceName()).apply(sessionModel.getId());
sessionStorageService.getSession(sessionModel.getId(), sessionModel.getDatasourceName());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
Expand All @@ -88,7 +88,7 @@ public void close() {
/** Submit statement. If submit successfully, Statement in waiting state. */
public StatementId submit(QueryRequest request) {
Optional<SessionModel> model =
getSession(stateStore, sessionModel.getDatasourceName()).apply(sessionModel.getId());
sessionStorageService.getSession(sessionModel.getId(), sessionModel.getDatasourceName());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
Expand All @@ -101,7 +101,7 @@ public StatementId submit(QueryRequest request) {
.sessionId(sessionId)
.applicationId(sessionModel.getApplicationId())
.jobId(sessionModel.getJobId())
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.statementId(statementId)
.langType(LangType.SQL)
.datasourceName(sessionModel.getDatasourceName())
Expand All @@ -124,8 +124,8 @@ public StatementId submit(QueryRequest request) {

@Override
public Optional<Statement> get(StatementId stID) {
return StateStore.getStatement(stateStore, sessionModel.getDatasourceName())
.apply(stID.getId())
return statementStorageService
.getStatement(stID.getId(), sessionModel.getDatasourceName())
.map(
model ->
Statement.builder()
Expand All @@ -136,7 +136,7 @@ public Optional<Statement> get(StatementId stID) {
.langType(model.getLangType())
.query(model.getQuery())
.queryId(model.getQueryId())
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.statementModel(model)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,31 @@
import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.statestore.SessionStorageService;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.utils.RealTimeProvider;

/**
* Singleton Class
*
* <p>todo. add Session cache and Session sweeper.
*/
@RequiredArgsConstructor
public class SessionManager {
private final StateStore stateStore;
private final SessionStorageService sessionStorageService;
private final StatementStorageService statementStorageService;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private Settings settings;

public SessionManager(
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory,
Settings settings) {
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.settings = settings;
}
private final Settings settings;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId(request.getDatasourceName()))
.stateStore(stateStore)
.sessionStorageService(sessionStorageService)
.statementStorageService(statementStorageService)
.serverlessClient(emrServerlessClientFactory.getClient())
.build();
session.open(request);
Expand All @@ -64,12 +60,13 @@ public Session createSession(CreateSessionRequest request) {
*/
public Optional<Session> getSession(SessionId sid, String dataSourceName) {
Optional<SessionModel> model =
StateStore.getSession(stateStore, dataSourceName).apply(sid.getSessionId());
sessionStorageService.getSession(sid.getSessionId(), dataSourceName);
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.stateStore(stateStore)
.sessionStorageService(sessionStorageService)
.statementStorageService(statementStorageService)
.serverlessClient(emrServerlessClientFactory.getClient())
.sessionModel(model.get())
.sessionInactivityTimeoutMilli(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
package org.opensearch.sql.spark.execution.statement;

import static org.opensearch.sql.spark.execution.statement.StatementModel.submitStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState;

import lombok.Builder;
import lombok.Getter;
Expand All @@ -18,7 +15,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.rest.model.LangType;

/** Statement represent query to execute in session. One statement map to one session. */
Expand All @@ -35,7 +32,7 @@ public class Statement {
private final String datasourceName;
private final String query;
private final String queryId;
private final StateStore stateStore;
private final StatementStorageService statementStorageService;

@Setter private StatementModel statementModel;

Expand All @@ -52,7 +49,7 @@ public void open() {
datasourceName,
query,
queryId);
statementModel = createStatement(stateStore, datasourceName).apply(statementModel);
statementModel = statementStorageService.createStatement(statementModel, datasourceName);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
Expand All @@ -76,17 +73,17 @@ public void cancel() {
}
try {
this.statementModel =
updateStatementState(stateStore, statementModel.getDatasourceName())
.apply(this.statementModel, StatementState.CANCELLED);
statementStorageService.updateStatementState(
statementModel, StatementState.CANCELLED, statementModel.getDatasourceName());
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
this.statementModel =
getStatement(stateStore, statementModel.getDatasourceName())
.apply(statementModel.getId())
statementStorageService
.getStatement(statementModel.getId(), statementModel.getDatasourceName())
.orElse(this.statementModel);
String errorMsg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;

@RequiredArgsConstructor
public class OpenSearchSessionStorageService implements SessionStorageService {

private final StateStore stateStore;

@Override
public SessionModel createSession(SessionModel sessionModel, String datasourceName) {
return stateStore.create(
sessionModel, SessionModel::of, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public Optional<SessionModel> getSession(String id, String datasourceName) {
return stateStore.get(
id, SessionModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName) {
return stateStore.updateState(
sessionModel,
sessionState,
SessionModel::copyWithState,
DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;

@RequiredArgsConstructor
public class OpenSearchStatementStorageService implements StatementStorageService {

private final StateStore stateStore;

@Override
public StatementModel createStatement(StatementModel statementModel, String datasourceName) {
return stateStore.create(
statementModel, StatementModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public Optional<StatementModel> getStatement(String id, String datasourceName) {
return stateStore.get(
id, StatementModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState, String datasourceName) {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import java.util.Optional;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;

/** Interface for accessing {@link SessionModel} data storage. */
public interface SessionStorageService {

SessionModel createSession(SessionModel sessionModel, String datasourceName);

Optional<SessionModel> getSession(String id, String datasourceName);

SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName);
}
Loading

0 comments on commit b48d36f

Please sign in to comment.