diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index 7c95f0eda5..37b2619783 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -16,7 +16,6 @@ import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; @@ -52,29 +51,23 @@ public class InteractiveSession implements Session { public void open( CreateSessionRequest createSessionRequest, AsyncQueryRequestContext asyncQueryRequestContext) { - try { - // append session id; - createSessionRequest - .getSparkSubmitParameters() - .acceptModifier( - (parameters) -> { - parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName()); - }); - createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId); - StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId); - String jobID = serverlessClient.startJobRun(startJobRequest); - String applicationId = startJobRequest.getApplicationId(); - String accountId = createSessionRequest.getAccountId(); + // append session id; + createSessionRequest + .getSparkSubmitParameters() + .acceptModifier( + (parameters) -> { + parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName()); + }); + createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId); + StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId); + String jobID = serverlessClient.startJobRun(startJobRequest); + String applicationId = startJobRequest.getApplicationId(); + String accountId = createSessionRequest.getAccountId(); - sessionModel = - initInteractiveSession( - accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName()); - sessionStorageService.createSession(sessionModel, asyncQueryRequestContext); - } catch (VersionConflictEngineException e) { - String errorMsg = "session already exist. " + sessionId; - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } + sessionModel = + initInteractiveSession( + accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName()); + sessionStorageService.createSession(sessionModel, asyncQueryRequestContext); } /** todo. StatementSweeper will delete doc. */ diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java b/async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java index b5edad0996..3237a5d372 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java @@ -12,8 +12,6 @@ import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.index.engine.DocumentMissingException; -import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.execution.statestore.StatementStorageService; import org.opensearch.sql.spark.rest.model.LangType; @@ -41,25 +39,19 @@ public class Statement { /** Open a statement. */ public void open() { - try { - statementModel = - submitStatement( - sessionId, - accountId, - applicationId, - jobId, - statementId, - langType, - datasourceName, - query, - queryId); - statementModel = - statementStorageService.createStatement(statementModel, asyncQueryRequestContext); - } catch (VersionConflictEngineException e) { - String errorMsg = "statement already exist. " + statementId; - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } + statementModel = + submitStatement( + sessionId, + accountId, + applicationId, + jobId, + statementId, + langType, + datasourceName, + query, + queryId); + statementModel = + statementStorageService.createStatement(statementModel, asyncQueryRequestContext); } /** Cancel a statement. */ @@ -77,26 +69,8 @@ public void cancel() { LOG.error(errorMsg); throw new IllegalStateException(errorMsg); } - try { - this.statementModel = - statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED); - } catch (DocumentMissingException e) { - String errorMsg = - String.format("cancel statement failed. no statement found. statement: %s.", statementId); - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } catch (VersionConflictEngineException e) { - this.statementModel = - statementStorageService - .getStatement(statementModel.getId(), statementModel.getDatasourceName()) - .orElse(this.statementModel); - String errorMsg = - String.format( - "cancel statement failed. current statementState: %s " + "statement: %s.", - this.statementModel.getStatementState(), statementId); - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } + this.statementModel = + statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED); } public StatementState getStatementState() { diff --git a/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java b/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java index eefc6a9b14..db5ded46b5 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java @@ -7,6 +7,9 @@ import java.util.Optional; import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; @@ -14,6 +17,7 @@ @RequiredArgsConstructor public class OpenSearchSessionStorageService implements SessionStorageService { + private static final Logger LOG = LogManager.getLogger(); private final StateStore stateStore; private final SessionModelXContentSerializer serializer; @@ -21,11 +25,17 @@ public class OpenSearchSessionStorageService implements SessionStorageService { @Override public SessionModel createSession( SessionModel sessionModel, AsyncQueryRequestContext asyncQueryRequestContext) { - return stateStore.create( - sessionModel.getId(), - sessionModel, - SessionModel::of, - OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName())); + try { + return stateStore.create( + sessionModel.getId(), + sessionModel, + SessionModel::of, + OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName())); + } catch (VersionConflictEngineException e) { + String errorMsg = "session already exist. " + sessionModel.getSessionId(); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } } @Override diff --git a/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java b/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java index 5fcccc22a4..67d0609ca5 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java @@ -7,6 +7,10 @@ import java.util.Optional; import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.engine.DocumentMissingException; +import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; @@ -14,6 +18,7 @@ @RequiredArgsConstructor public class OpenSearchStatementStorageService implements StatementStorageService { + private static final Logger LOG = LogManager.getLogger(); private final StateStore stateStore; private final StatementModelXContentSerializer serializer; @@ -21,11 +26,17 @@ public class OpenSearchStatementStorageService implements StatementStorageServic @Override public StatementModel createStatement( StatementModel statementModel, AsyncQueryRequestContext asyncQueryRequestContext) { - return stateStore.create( - statementModel.getId(), - statementModel, - StatementModel::copy, - OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName())); + try { + return stateStore.create( + statementModel.getId(), + statementModel, + StatementModel::copy, + OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName())); + } catch (VersionConflictEngineException e) { + String errorMsg = "statement already exist. " + statementModel.getStatementId(); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } } @Override @@ -37,10 +48,29 @@ public Optional getStatement(String id, String datasourceName) { @Override public StatementModel updateStatementState( StatementModel oldStatementModel, StatementState statementState) { - return stateStore.updateState( - oldStatementModel, - statementState, - StatementModel::copyWithState, - OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName())); + try { + return stateStore.updateState( + oldStatementModel, + statementState, + StatementModel::copyWithState, + OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName())); + } catch (DocumentMissingException e) { + String errorMsg = + String.format( + "cancel statement failed. no statement found. statement: %s.", + oldStatementModel.getStatementId()); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } catch (VersionConflictEngineException e) { + StatementModel statementModel = + getStatement(oldStatementModel.getId(), oldStatementModel.getDatasourceName()) + .orElse(oldStatementModel); + String errorMsg = + String.format( + "cancel statement failed. current statementState: %s " + "statement: %s.", + statementModel.getStatementState(), statementModel.getStatementId()); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } } }