Skip to content

Commit

Permalink
Add Statement (#2294) (#2318)
Browse files Browse the repository at this point in the history
* add InteractiveSession and SessionManager



* add statement



* add statement



* fix format



* address comments



---------


(cherry picked from commit 297e26f)

Signed-off-by: Peng Huo <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 3dd17a3 commit 7aa3cab
Show file tree
Hide file tree
Showing 19 changed files with 1,055 additions and 169 deletions.
5 changes: 3 additions & 2 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.SessionStateStore',
'org.opensearch.sql.spark.execution.session.SessionModel'
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession;
import static org.opensearch.sql.spark.execution.session.SessionState.END_STATE;
import static org.opensearch.sql.spark.execution.statement.StatementId.newStatementId;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createSession;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession;

import java.util.Optional;
import lombok.Builder;
Expand All @@ -14,7 +18,11 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;

/**
* Interactive session.
Expand All @@ -27,9 +35,8 @@ public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final SessionStateStore sessionStateStore;
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;

private SessionModel sessionModel;

@Override
Expand All @@ -41,21 +48,75 @@ public void open(CreateSessionRequest createSessionRequest) {
sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStateStore.create(sessionModel);
createSession(stateStore).apply(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

/** todo. StatementSweeper will delete doc. */
@Override
public void close() {
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId());
Optional<SessionModel> model = getSession(stateStore).apply(sessionModel.getId());
if (model.isEmpty()) {
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId());
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
}
}

/** Submit statement. If submit successfully, Statement in waiting state. */
public StatementId submit(QueryRequest request) {
Optional<SessionModel> model = getSession(stateStore).apply(sessionModel.getId());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
sessionModel = model.get();
if (!END_STATE.contains(sessionModel.getSessionState())) {
StatementId statementId = newStatementId();
Statement st =
Statement.builder()
.sessionId(sessionId)
.applicationId(sessionModel.getApplicationId())
.jobId(sessionModel.getJobId())
.stateStore(stateStore)
.statementId(statementId)
.langType(LangType.SQL)
.query(request.getQuery())
.queryId(statementId.getId())
.build();
st.open();
return statementId;
} else {
String errMsg =
String.format(
"can't submit statement, session should not be in end state, "
+ "current session state is: %s",
sessionModel.getSessionState().getSessionState());
LOG.debug(errMsg);
throw new IllegalStateException(errMsg);
}
}
}

@Override
public Optional<Statement> get(StatementId stID) {
return StateStore.getStatement(stateStore)
.apply(stID.getId())
.map(
model ->
Statement.builder()
.sessionId(sessionId)
.applicationId(model.getApplicationId())
.jobId(model.getJobId())
.statementId(model.getStatementId())
.langType(model.getLangType())
.query(model.getQuery())
.queryId(model.getQueryId())
.stateStore(stateStore)
.statementModel(model)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.sql.spark.execution.session;

import java.util.Optional;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;

/** Session define the statement execution context. Each session is binding to one Spark Job. */
public interface Session {
/** open session. */
Expand All @@ -13,6 +18,22 @@ public interface Session {
/** close session. */
void close();

/**
* submit {@link QueryRequest}.
*
* @param request {@link QueryRequest}
* @return {@link StatementId}
*/
StatementId submit(QueryRequest request);

/**
* get {@link Statement}.
*
* @param stID {@link StatementId}
* @return {@link Statement}
*/
Optional<Statement> get(StatementId stID);

SessionModel getSessionModel();

SessionId getSessionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;
import org.opensearch.sql.spark.execution.statestore.StateStore;

/**
* Singleton Class
Expand All @@ -19,27 +19,27 @@
*/
@RequiredArgsConstructor
public class SessionManager {
private final SessionStateStore stateStore;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId())
.sessionStateStore(stateStore)
.stateStore(stateStore)
.serverlessClient(emrServerlessClient)
.build();
session.open(request);
return session;
}

public Optional<Session> getSession(SessionId sid) {
Optional<SessionModel> model = stateStore.get(sid);
Optional<SessionModel> model = StateStore.getSession(stateStore).apply(sid.getSessionId());
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.sessionStateStore(stateStore)
.stateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel implements ToXContentObject {
public class SessionModel extends StateModel {
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
Expand Down Expand Up @@ -73,6 +73,27 @@ public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(copy.sessionState)
.datasourceName(copy.datasourceName)
.applicationId(copy.getApplicationId())
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
}

public static SessionModel copyWithState(
SessionModel copy, SessionState state, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(state)
.datasourceName(copy.datasourceName)
.applicationId(copy.getApplicationId())
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
Expand Down Expand Up @@ -140,4 +161,9 @@ public static SessionModel initInteractiveSession(
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.build();
}

@Override
public String getId() {
return sessionId.getSessionId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.sql.spark.execution.session;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
Expand All @@ -17,6 +19,8 @@ public enum SessionState {
DEAD("dead"),
FAIL("fail");

public static List<SessionState> END_STATE = ImmutableList.of(DEAD, FAIL);

private final String sessionState;

SessionState(String sessionState) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statement;

import lombok.Data;
import org.opensearch.sql.spark.rest.model.LangType;

@Data
public class QueryRequest {
private final LangType langType;
private final String query;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statement;

import static org.opensearch.sql.spark.execution.statement.StatementModel.submitStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;

/** Statement represent query to execute in session. One statement map to one session. */
@Getter
@Builder
public class Statement {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final String applicationId;
private final String jobId;
private final StatementId statementId;
private final LangType langType;
private final String query;
private final String queryId;
private final StateStore stateStore;

@Setter private StatementModel statementModel;

/** Open a statement. */
public void open() {
try {
statementModel =
submitStatement(sessionId, applicationId, jobId, statementId, langType, query, queryId);
statementModel = createStatement(stateStore).apply(statementModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

/** Cancel a statement. */
public void cancel() {
if (statementModel.getStatementState().equals(StatementState.RUNNING)) {
String errorMsg =
String.format("can't cancel statement in waiting state. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
try {
this.statementModel =
updateStatementState(stateStore).apply(this.statementModel, StatementState.CANCELLED);
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
this.statementModel =
getStatement(stateStore).apply(statementModel.getId()).orElse(this.statementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
this.statementModel.getStatementState(), statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

public StatementState getStatementState() {
return statementModel.getStatementState();
}
}
Loading

0 comments on commit 7aa3cab

Please sign in to comment.