diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index 1c7fd35c5e..bef8218b15 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -7,19 +7,9 @@ package org.opensearch.sql.spark.asyncquery.model; -import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; - import com.google.gson.Gson; -import java.io.IOException; -import java.util.Locale; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.SneakyThrows; -import org.opensearch.core.common.Strings; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -28,10 +18,6 @@ @Data @EqualsAndHashCode(callSuper = false) public class AsyncQueryJobMetadata extends StateModel { - public static final String TYPE_JOBMETA = "jobmeta"; - public static final String JOB_TYPE = "jobType"; - public static final String INDEX_NAME = "indexName"; - private final AsyncQueryId queryId; private final String applicationId; private final String jobId; @@ -134,29 +120,6 @@ public String toString() { return new Gson().toJson(this); } - /** - * Converts JobMetadata to XContentBuilder. - * - * @return XContentBuilder {@link XContentBuilder} - * @throws Exception Exception. - */ - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(QUERY_ID, queryId.getId()) - .field("type", TYPE_JOBMETA) - .field("jobId", jobId) - .field("applicationId", applicationId) - .field("resultIndex", resultIndex) - .field("sessionId", sessionId) - .field(DATASOURCE_NAME, datasourceName) - .field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT)) - .field(INDEX_NAME, indexName) - .endObject(); - return builder; - } - /** copy builder. update seqNo and primaryTerm */ public static AsyncQueryJobMetadata copy( AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) { @@ -173,72 +136,6 @@ public static AsyncQueryJobMetadata copy( primaryTerm); } - /** - * Convert xcontent parser to JobMetadata. - * - * @param parser parser. - * @return JobMetadata {@link AsyncQueryJobMetadata} - * @throws IOException IOException. - */ - @SneakyThrows - public static AsyncQueryJobMetadata fromXContent( - XContentParser parser, long seqNo, long primaryTerm) { - AsyncQueryId queryId = null; - String jobId = null; - String applicationId = null; - String resultIndex = null; - String sessionId = null; - String datasourceName = null; - String jobTypeStr = null; - String indexName = null; - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case QUERY_ID: - queryId = new AsyncQueryId(parser.textOrNull()); - break; - case "jobId": - jobId = parser.textOrNull(); - break; - case "applicationId": - applicationId = parser.textOrNull(); - break; - case "resultIndex": - resultIndex = parser.textOrNull(); - break; - case "sessionId": - sessionId = parser.textOrNull(); - break; - case DATASOURCE_NAME: - datasourceName = parser.textOrNull(); - case JOB_TYPE: - jobTypeStr = parser.textOrNull(); - case INDEX_NAME: - indexName = parser.textOrNull(); - case "type": - break; - default: - throw new IllegalArgumentException("Unknown field: " + fieldName); - } - } - if (jobId == null || applicationId == null) { - throw new IllegalArgumentException("jobId and applicationId are required fields."); - } - return new AsyncQueryJobMetadata( - queryId, - applicationId, - jobId, - resultIndex, - sessionId, - datasourceName, - Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr), - indexName, - seqNo, - primaryTerm); - } - @Override public String getId() { return queryId.docId(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 314e83a6db..d54b6c29af 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -13,7 +13,6 @@ import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.*; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; import java.net.URI; import java.net.URISyntaxException; @@ -27,6 +26,7 @@ import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.AuthenticationType; +import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; /** Define Spark Submit Parameters. */ @AllArgsConstructor @@ -181,7 +181,7 @@ public Builder extraParameters(String params) { } public Builder sessionExecution(String sessionId, String datasourceName) { - config.put(FLINT_JOB_REQUEST_INDEX, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + config.put(FLINT_JOB_REQUEST_INDEX, OpenSearchStateStoreUtil.getIndexName(datasourceName)); config.put(FLINT_JOB_SESSION_ID, sessionId); return this; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java index 3ca56ca173..628b578ae9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java @@ -5,8 +5,8 @@ package org.opensearch.sql.spark.cluster; -import static org.opensearch.sql.spark.execution.session.SessionModel.LAST_UPDATE_TIME; -import static org.opensearch.sql.spark.execution.statement.StatementModel.SUBMIT_TIME; +import static org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer.SUBMIT_TIME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.LAST_UPDATE_TIME; import java.time.Clock; import java.time.Duration; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java index b01ecf55ba..d0b99e883e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java @@ -5,13 +5,8 @@ package org.opensearch.sql.spark.dispatcher.model; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; - -import com.google.common.collect.ImmutableList; -import java.io.IOException; import lombok.Data; import lombok.EqualsAndHashCode; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -19,10 +14,7 @@ @Data @EqualsAndHashCode(callSuper = false) public class IndexDMLResult extends StateModel { - private static final String QUERY_ID = "queryId"; - private static final String QUERY_RUNTIME = "queryRunTime"; - private static final String UPDATE_TIME = "updateTime"; - private static final String DOC_ID_PREFIX = "index"; + public static final String DOC_ID_PREFIX = "index"; private final String queryId; private final String status; @@ -55,20 +47,4 @@ public long getSeqNo() { public long getPrimaryTerm() { return SequenceNumbers.UNASSIGNED_PRIMARY_TERM; } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(QUERY_ID, queryId) - .field("status", status) - .field("error", error) - .field(DATASOURCE_NAME, datasourceName) - .field(QUERY_RUNTIME, queryRunTime) - .field(UPDATE_TIME, updateTime) - .field("result", ImmutableList.of()) - .field("schema", ImmutableList.of()) - .endObject(); - return builder; - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java index 806cdb083e..09e83ea41c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java @@ -8,13 +8,8 @@ import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE; -import java.io.IOException; import lombok.Builder; import lombok.Data; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -22,18 +17,8 @@ @Data @Builder public class SessionModel extends StateModel { - public static final String VERSION = "version"; - public static final String TYPE = "type"; - public static final String SESSION_TYPE = "sessionType"; - public static final String SESSION_ID = "sessionId"; - public static final String SESSION_STATE = "state"; - public static final String DATASOURCE_NAME = "dataSourceName"; - public static final String LAST_UPDATE_TIME = "lastUpdateTime"; - public static final String APPLICATION_ID = "applicationId"; - public static final String JOB_ID = "jobId"; - public static final String ERROR = "error"; + public static final String UNKNOWN = "unknown"; - public static final String SESSION_DOC_TYPE = "session"; private final String version; private final SessionType sessionType; @@ -48,24 +33,6 @@ public class SessionModel extends StateModel { private final long seqNo; private final long primaryTerm; - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, version) - .field(TYPE, SESSION_DOC_TYPE) - .field(SESSION_TYPE, sessionType.getSessionType()) - .field(SESSION_ID, sessionId.getSessionId()) - .field(SESSION_STATE, sessionState.getSessionState()) - .field(DATASOURCE_NAME, datasourceName) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } - public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) { return builder() .version(copy.version) @@ -99,52 +66,6 @@ public static SessionModel copyWithState( .build(); } - @SneakyThrows - public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { - SessionModelBuilder builder = new SessionModelBuilder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case VERSION: - builder.version(parser.text()); - break; - case SESSION_TYPE: - builder.sessionType(SessionType.fromString(parser.text())); - break; - case SESSION_ID: - builder.sessionId(new SessionId(parser.text())); - break; - case SESSION_STATE: - builder.sessionState(SessionState.fromString(parser.text())); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case ERROR: - builder.error(parser.text()); - break; - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LAST_UPDATE_TIME: - builder.lastUpdateTime(parser.longValue()); - break; - case TYPE: - // do nothing. - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - public static SessionModel initInteractiveSession( String applicationId, String jobId, SessionId sid, String datasourceName) { return builder() diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java index adc147c905..f58e3a4f1c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java @@ -5,18 +5,10 @@ package org.opensearch.sql.spark.execution.statement; -import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING; -import java.io.IOException; import lombok.Builder; import lombok.Data; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -26,18 +18,7 @@ @Data @Builder public class StatementModel extends StateModel { - public static final String VERSION = "version"; - public static final String TYPE = "type"; - public static final String STATEMENT_STATE = "state"; - public static final String STATEMENT_ID = "statementId"; - public static final String SESSION_ID = "sessionId"; - public static final String LANG = "lang"; - public static final String QUERY = "query"; - public static final String QUERY_ID = "queryId"; - public static final String SUBMIT_TIME = "submitTime"; - public static final String ERROR = "error"; public static final String UNKNOWN = ""; - public static final String STATEMENT_DOC_TYPE = "statement"; private final String version; private final StatementState statementState; @@ -55,27 +36,6 @@ public class StatementModel extends StateModel { private final long seqNo; private final long primaryTerm; - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, version) - .field(TYPE, STATEMENT_DOC_TYPE) - .field(STATEMENT_STATE, statementState.getState()) - .field(STATEMENT_ID, statementId.getId()) - .field(SESSION_ID, sessionId.getSessionId()) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LANG, langType.getText()) - .field(DATASOURCE_NAME, datasourceName) - .field(QUERY, query) - .field(QUERY_ID, queryId) - .field(SUBMIT_TIME, submitTime) - .field(ERROR, error) - .endObject(); - return builder; - } - public static StatementModel copy(StatementModel copy, long seqNo, long primaryTerm) { return builder() .version("1.0") @@ -115,61 +75,6 @@ public static StatementModel copyWithState( .build(); } - @SneakyThrows - public static StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { - StatementModel.StatementModelBuilder builder = StatementModel.builder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case VERSION: - builder.version(parser.text()); - break; - case TYPE: - // do nothing - break; - case STATEMENT_STATE: - builder.statementState(StatementState.fromString(parser.text())); - break; - case STATEMENT_ID: - builder.statementId(new StatementId(parser.text())); - break; - case SESSION_ID: - builder.sessionId(new SessionId(parser.text())); - break; - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LANG: - builder.langType(LangType.fromString(parser.text())); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case QUERY: - builder.query(parser.text()); - break; - case QUERY_ID: - builder.queryId(parser.text()); - break; - case SUBMIT_TIME: - builder.submitTime(parser.longValue()); - break; - case ERROR: - builder.error(parser.text()); - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - public static StatementModel submitStatement( SessionId sid, String applicationId, diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java new file mode 100644 index 0000000000..58dc056348 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.statestore; + +public interface CopyBuilder { + T of(T copy, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java new file mode 100644 index 0000000000..0f691fc9c0 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.statestore; + +import org.opensearch.core.xcontent.XContentParser; + +public interface FromXContent { + T fromXContent(XContentParser parser, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java index cfff219eaa..a229d4f6bf 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchSessionStorageService.java @@ -5,28 +5,28 @@ package org.opensearch.sql.spark.execution.statestore; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; - import java.util.Optional; import lombok.RequiredArgsConstructor; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; @RequiredArgsConstructor public class OpenSearchSessionStorageService implements SessionStorageService { private final StateStore stateStore; + private final SessionModelXContentSerializer serializer; @Override public SessionModel createSession(SessionModel sessionModel, String datasourceName) { return stateStore.create( - sessionModel, SessionModel::of, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + sessionModel, SessionModel::of, OpenSearchStateStoreUtil.getIndexName(datasourceName)); } @Override public Optional getSession(String id, String datasourceName) { return stateStore.get( - id, SessionModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + id, serializer::fromXContent, OpenSearchStateStoreUtil.getIndexName(datasourceName)); } @Override @@ -36,6 +36,6 @@ public SessionModel updateSessionState( sessionModel, sessionState, SessionModel::copyWithState, - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + OpenSearchStateStoreUtil.getIndexName(datasourceName)); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java index b218490d6a..226fb8d32a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStatementStorageService.java @@ -5,28 +5,30 @@ package org.opensearch.sql.spark.execution.statestore; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; - import java.util.Optional; import lombok.RequiredArgsConstructor; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; @RequiredArgsConstructor public class OpenSearchStatementStorageService implements StatementStorageService { private final StateStore stateStore; + private final StatementModelXContentSerializer serializer; @Override public StatementModel createStatement(StatementModel statementModel, String datasourceName) { return stateStore.create( - statementModel, StatementModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + statementModel, + StatementModel::copy, + OpenSearchStateStoreUtil.getIndexName(datasourceName)); } @Override public Optional getStatement(String id, String datasourceName) { return stateStore.get( - id, StatementModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + id, serializer::fromXContent, OpenSearchStateStoreUtil.getIndexName(datasourceName)); } @Override @@ -36,6 +38,6 @@ public StatementModel updateStatementState( oldStatementModel, statementState, StatementModel::copyWithState, - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + OpenSearchStateStoreUtil.getIndexName(datasourceName)); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java new file mode 100644 index 0000000000..7bc14f5a2e --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.statestore; + +public interface StateCopyBuilder { + T of(T copy, S state, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java index fe105cc8e4..cc1b9d56d4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java @@ -5,30 +5,10 @@ package org.opensearch.sql.spark.execution.statestore; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentParser; - -public abstract class StateModel implements ToXContentObject { - public static final String VERSION_1_0 = "1.0"; - public static final String TYPE = "type"; - public static final String STATE = "state"; - public static final String LAST_UPDATE_TIME = "lastUpdateTime"; - +public abstract class StateModel { public abstract String getId(); public abstract long getSeqNo(); public abstract long getPrimaryTerm(); - - public interface CopyBuilder { - T of(T copy, long seqNo, long primaryTerm); - } - - public interface StateCopyBuilder { - T of(T copy, S state, long seqNo, long primaryTerm); - } - - public interface FromXContent { - T fromXContent(XContentParser parser, long seqNo, long primaryTerm); - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index 3de83b2f3e..56d2a0f179 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -5,16 +5,12 @@ package org.opensearch.sql.spark.execution.statestore; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; -import static org.opensearch.sql.spark.execution.statestore.StateModel.STATE; - import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.Optional; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import lombok.RequiredArgsConstructor; @@ -40,7 +36,6 @@ import org.opensearch.common.action.ActionFuture; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; @@ -49,11 +44,19 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.session.SessionType; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.IndexDMLResultXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes; +import org.opensearch.sql.spark.execution.xcontent.XContentSerializer; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; @@ -65,10 +68,6 @@ public class StateStore { public static String SETTINGS_FILE_NAME = "query_execution_request_settings.yml"; public static String MAPPING_FILE_NAME = "query_execution_request_mapping.yml"; - public static Function DATASOURCE_TO_REQUEST_INDEX = - datasourceName -> - String.format( - "%s_%s", SPARK_REQUEST_BUFFER_INDEX_NAME, datasourceName.toLowerCase(Locale.ROOT)); public static String ALL_DATASOURCE = "*"; private static final Logger LOG = LogManager.getLogger(); @@ -77,16 +76,16 @@ public class StateStore { private final ClusterService clusterService; @VisibleForTesting - public T create( - T st, StateModel.CopyBuilder builder, String indexName) { + public T create(T st, CopyBuilder builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { createIndex(indexName); } + XContentSerializer serializer = getXContentSerializer(st); IndexRequest indexRequest = new IndexRequest(indexName) .id(st.getId()) - .source(st.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .source(serializer.toXContent(st, ToXContent.EMPTY_PARAMS)) .setIfSeqNo(st.getSeqNo()) .setIfPrimaryTerm(st.getPrimaryTerm()) .create(true) @@ -113,7 +112,7 @@ public T create( @VisibleForTesting public Optional get( - String sid, StateModel.FromXContent builder, String indexName) { + String sid, FromXContent builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { createIndex(indexName); @@ -145,16 +144,17 @@ public Optional get( @VisibleForTesting public T updateState( - T st, S state, StateModel.StateCopyBuilder builder, String indexName) { + T st, S state, StateCopyBuilder builder, String indexName) { try { T model = builder.of(st, state, st.getSeqNo(), st.getPrimaryTerm()); + XContentSerializer serializer = getXContentSerializer(st); UpdateRequest updateRequest = new UpdateRequest() .index(indexName) .id(model.getId()) .setIfSeqNo(model.getSeqNo()) .setIfPrimaryTerm(model.getPrimaryTerm()) - .doc(model.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .doc(serializer.toXContent(model, ToXContent.EMPTY_PARAMS)) .fetchSource(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); try (ThreadContext.StoredContext ignored = @@ -255,64 +255,83 @@ public static Function createJobMe stateStore.create( jobMetadata, AsyncQueryJobMetadata::copy, - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + OpenSearchStateStoreUtil.getIndexName(datasourceName)); } public static Function> getJobMetaData( StateStore stateStore, String datasourceName) { + AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer = + new AsyncQueryJobMetadataXContentSerializer(); return (docId) -> stateStore.get( docId, - AsyncQueryJobMetadata::fromXContent, - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + asyncQueryJobMetadataXContentSerializer::fromXContent, + OpenSearchStateStoreUtil.getIndexName(datasourceName)); } public static Supplier activeSessionsCount(StateStore stateStore, String datasourceName) { return () -> stateStore.count( - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), + OpenSearchStateStoreUtil.getIndexName(datasourceName), QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(SessionModel.TYPE, SessionModel.SESSION_DOC_TYPE)) .must( QueryBuilders.termQuery( - SessionModel.SESSION_TYPE, SessionType.INTERACTIVE.getSessionType())) + XContentCommonAttributes.TYPE, + SessionModelXContentSerializer.SESSION_DOC_TYPE)) .must( QueryBuilders.termQuery( - SessionModel.SESSION_STATE, SessionState.RUNNING.getSessionState()))); - } - - public static BiFunction - updateFlintIndexState(StateStore stateStore, String datasourceName) { - return (old, state) -> - stateStore.updateState( - old, - state, - FlintIndexStateModel::copyWithState, - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + SessionModelXContentSerializer.SESSION_TYPE, + SessionType.INTERACTIVE.getSessionType())) + .must( + QueryBuilders.termQuery( + XContentCommonAttributes.STATE, SessionState.RUNNING.getSessionState()))); } public static Supplier activeRefreshJobCount(StateStore stateStore, String datasourceName) { return () -> stateStore.count( - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), + OpenSearchStateStoreUtil.getIndexName(datasourceName), QueryBuilders.boolQuery() .must( QueryBuilders.termQuery( - SessionModel.TYPE, FlintIndexStateModel.FLINT_INDEX_DOC_TYPE)) - .must(QueryBuilders.termQuery(STATE, FlintIndexState.REFRESHING.getState()))); + XContentCommonAttributes.TYPE, + FlintIndexStateModelXContentSerializer.FLINT_INDEX_DOC_TYPE)) + .must( + QueryBuilders.termQuery( + XContentCommonAttributes.STATE, FlintIndexState.REFRESHING.getState()))); } public static Supplier activeStatementsCount(StateStore stateStore, String datasourceName) { return () -> stateStore.count( - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), + OpenSearchStateStoreUtil.getIndexName(datasourceName), QueryBuilders.boolQuery() .must( - QueryBuilders.termQuery(StatementModel.TYPE, StatementModel.STATEMENT_DOC_TYPE)) + QueryBuilders.termQuery( + XContentCommonAttributes.TYPE, + StatementModelXContentSerializer.STATEMENT_DOC_TYPE)) .should( QueryBuilders.termsQuery( - StatementModel.STATEMENT_STATE, + XContentCommonAttributes.STATE, StatementState.RUNNING.getState(), StatementState.WAITING.getState()))); } + + @SuppressWarnings("unchecked") + private XContentSerializer getXContentSerializer(T st) { + if (st instanceof StatementModel) { + return (XContentSerializer) new StatementModelXContentSerializer(); + } else if (st instanceof SessionModel) { + return (XContentSerializer) new SessionModelXContentSerializer(); + } else if (st instanceof FlintIndexStateModel) { + return (XContentSerializer) new FlintIndexStateModelXContentSerializer(); + } else if (st instanceof AsyncQueryJobMetadata) { + return (XContentSerializer) new AsyncQueryJobMetadataXContentSerializer(); + } else if (st instanceof IndexDMLResult) { + return (XContentSerializer) new IndexDMLResultXContentSerializer(); + } else { + throw new IllegalArgumentException( + "Unsupported StateModel subclass: " + st.getClass().getSimpleName()); + } + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java new file mode 100644 index 0000000000..bf61818b9f --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.JOB_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.QUERY_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.TYPE; + +import java.io.IOException; +import java.util.Locale; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.dispatcher.model.JobType; + +public class AsyncQueryJobMetadataXContentSerializer + implements XContentSerializer { + public static final String TYPE_JOBMETA = "jobmeta"; + public static final String JOB_TYPE = "jobType"; + public static final String INDEX_NAME = "indexName"; + public static final String RESULT_INDEX = "resultIndex"; + public static final String SESSION_ID = "sessionId"; + + @Override + public XContentBuilder toXContent(AsyncQueryJobMetadata jobMetadata, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(QUERY_ID, jobMetadata.getQueryId().getId()) + .field(TYPE, TYPE_JOBMETA) + .field(JOB_ID, jobMetadata.getJobId()) + .field(APPLICATION_ID, jobMetadata.getApplicationId()) + .field(RESULT_INDEX, jobMetadata.getResultIndex()) + .field(SESSION_ID, jobMetadata.getSessionId()) + .field(DATASOURCE_NAME, jobMetadata.getDatasourceName()) + .field(JOB_TYPE, jobMetadata.getJobType().getText().toLowerCase(Locale.ROOT)) + .field(INDEX_NAME, jobMetadata.getIndexName()) + .endObject(); + } + + @Override + @SneakyThrows + public AsyncQueryJobMetadata fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + AsyncQueryId queryId = null; + String jobId = null; + String applicationId = null; + String resultIndex = null; + String sessionId = null; + String datasourceName = null; + String jobTypeStr = null; + String indexName = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case QUERY_ID: + queryId = new AsyncQueryId(parser.textOrNull()); + break; + case JOB_ID: + jobId = parser.textOrNull(); + break; + case APPLICATION_ID: + applicationId = parser.textOrNull(); + break; + case RESULT_INDEX: + resultIndex = parser.textOrNull(); + break; + case SESSION_ID: + sessionId = parser.textOrNull(); + break; + case DATASOURCE_NAME: + datasourceName = parser.textOrNull(); + break; + case JOB_TYPE: + jobTypeStr = parser.textOrNull(); + break; + case INDEX_NAME: + indexName = parser.textOrNull(); + break; + case TYPE: + break; + default: + throw new IllegalArgumentException("Unknown field: " + fieldName); + } + } + if (jobId == null || applicationId == null) { + throw new IllegalArgumentException("jobId and applicationId are required fields."); + } + return new AsyncQueryJobMetadata( + queryId, + applicationId, + jobId, + resultIndex, + sessionId, + datasourceName, + Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr), + indexName, + seqNo, + primaryTerm); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java new file mode 100644 index 0000000000..87ddc6f719 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.ERROR; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.JOB_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.LAST_UPDATE_TIME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.STATE; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.TYPE; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.VERSION; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.VERSION_1_0; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +public class FlintIndexStateModelXContentSerializer + implements XContentSerializer { + public static final String FLINT_INDEX_DOC_TYPE = "flintindexstate"; + public static final String LATEST_ID = "latestId"; + + @Override + public XContentBuilder toXContent( + FlintIndexStateModel flintIndexStateModel, ToXContent.Params params) throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(VERSION, VERSION_1_0) + .field(TYPE, FLINT_INDEX_DOC_TYPE) + .field(STATE, flintIndexStateModel.getIndexState().getState()) + .field(APPLICATION_ID, flintIndexStateModel.getApplicationId()) + .field(JOB_ID, flintIndexStateModel.getJobId()) + .field(LATEST_ID, flintIndexStateModel.getLatestId()) + .field(DATASOURCE_NAME, flintIndexStateModel.getDatasourceName()) + .field(LAST_UPDATE_TIME, flintIndexStateModel.getLastUpdateTime()) + .field(ERROR, flintIndexStateModel.getError()) + .endObject(); + } + + @Override + @SneakyThrows + public FlintIndexStateModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + // Implement the fromXContent logic here + FlintIndexStateModel.FlintIndexStateModelBuilder builder = FlintIndexStateModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case STATE: + builder.indexState(FlintIndexState.fromString(parser.text())); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LATEST_ID: + builder.latestId(parser.text()); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case ERROR: + builder.error(parser.text()); + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java new file mode 100644 index 0000000000..505533157d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.ERROR; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.QUERY_ID; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; + +public class IndexDMLResultXContentSerializer implements XContentSerializer { + public static final String QUERY_RUNTIME = "queryRunTime"; + public static final String UPDATE_TIME = "updateTime"; + + @Override + public XContentBuilder toXContent(IndexDMLResult dmlResult, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(QUERY_ID, dmlResult.getQueryId()) + .field("status", dmlResult.getStatus()) + .field(ERROR, dmlResult.getError()) + .field(DATASOURCE_NAME, dmlResult.getDatasourceName()) + .field(QUERY_RUNTIME, dmlResult.getQueryRunTime()) + .field(UPDATE_TIME, dmlResult.getUpdateTime()) + .field("result", ImmutableList.of()) + .field("schema", ImmutableList.of()) + .endObject(); + } + + @Override + public IndexDMLResult fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + throw new UnsupportedOperationException("IndexDMLResult to fromXContent Not supported"); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java new file mode 100644 index 0000000000..d453b6ffa9 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.ERROR; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.JOB_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.LAST_UPDATE_TIME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.STATE; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.TYPE; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.VERSION; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionModel; +import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.session.SessionType; + +public class SessionModelXContentSerializer implements XContentSerializer { + public static final String SESSION_DOC_TYPE = "session"; + public static final String SESSION_TYPE = "sessionType"; + public static final String SESSION_ID = "sessionId"; + + @Override + public XContentBuilder toXContent(SessionModel sessionModel, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(VERSION, sessionModel.getVersion()) + .field(TYPE, SESSION_DOC_TYPE) + .field(SESSION_TYPE, sessionModel.getSessionType().getSessionType()) + .field(SESSION_ID, sessionModel.getSessionId().getSessionId()) + .field(STATE, sessionModel.getSessionState().getSessionState()) + .field(DATASOURCE_NAME, sessionModel.getDatasourceName()) + .field(APPLICATION_ID, sessionModel.getApplicationId()) + .field(JOB_ID, sessionModel.getJobId()) + .field(LAST_UPDATE_TIME, sessionModel.getLastUpdateTime()) + .field(ERROR, sessionModel.getError()) + .endObject(); + } + + @Override + @SneakyThrows + public SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + // Implement the fromXContent logic here + SessionModel.SessionModelBuilder builder = SessionModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case VERSION: + builder.version(parser.text()); + break; + case SESSION_TYPE: + builder.sessionType(SessionType.fromString(parser.text())); + break; + case SESSION_ID: + builder.sessionId(new SessionId(parser.text())); + break; + case STATE: + builder.sessionState(SessionState.fromString(parser.text())); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case ERROR: + builder.error(parser.text()); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case TYPE: + // do nothing. + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java new file mode 100644 index 0000000000..2323df998d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java @@ -0,0 +1,117 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer.SESSION_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.ERROR; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.JOB_ID; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.STATE; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.TYPE; +import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.VERSION; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.statement.StatementId; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.rest.model.LangType; + +public class StatementModelXContentSerializer implements XContentSerializer { + public static final String STATEMENT_DOC_TYPE = "statement"; + public static final String STATEMENT_ID = "statementId"; + public static final String LANG = "lang"; + public static final String QUERY = "query"; + public static final String QUERY_ID = "queryId"; + public static final String SUBMIT_TIME = "submitTime"; + public static final String UNKNOWN = ""; + + @Override + public XContentBuilder toXContent(StatementModel statementModel, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(VERSION, statementModel.getVersion()) + .field(TYPE, STATEMENT_DOC_TYPE) + .field(STATE, statementModel.getStatementState().getState()) + .field(STATEMENT_ID, statementModel.getStatementId().getId()) + .field(SESSION_ID, statementModel.getSessionId().getSessionId()) + .field(APPLICATION_ID, statementModel.getApplicationId()) + .field(JOB_ID, statementModel.getJobId()) + .field(LANG, statementModel.getLangType().getText()) + .field(DATASOURCE_NAME, statementModel.getDatasourceName()) + .field(QUERY, statementModel.getQuery()) + .field(QUERY_ID, statementModel.getQueryId()) + .field(SUBMIT_TIME, statementModel.getSubmitTime()) + .field(ERROR, statementModel.getError()) + .endObject(); + } + + @Override + @SneakyThrows + public StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + StatementModel.StatementModelBuilder builder = StatementModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case VERSION: + builder.version(parser.text()); + break; + case TYPE: + // do nothing + break; + case STATE: + builder.statementState(StatementState.fromString(parser.text())); + break; + case STATEMENT_ID: + builder.statementId(new StatementId(parser.text())); + break; + case SESSION_ID: + builder.sessionId(new SessionId(parser.text())); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LANG: + builder.langType(LangType.fromString(parser.text())); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case QUERY: + builder.query(parser.text()); + break; + case QUERY_ID: + builder.queryId(parser.text()); + break; + case SUBMIT_TIME: + builder.submitTime(parser.longValue()); + break; + case ERROR: + builder.error(parser.text()); + break; + default: + throw new IllegalArgumentException("Unexpected field: " + fieldName); + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentCommonAttributes.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentCommonAttributes.java new file mode 100644 index 0000000000..0fe928000d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentCommonAttributes.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class XContentCommonAttributes { + public static final String VERSION = "version"; + public static final String VERSION_1_0 = "1.0"; + public static final String TYPE = "type"; + public static final String QUERY_ID = "queryId"; + public static final String STATE = "state"; + public static final String LAST_UPDATE_TIME = "lastUpdateTime"; + public static final String APPLICATION_ID = "applicationId"; + public static final String DATASOURCE_NAME = "dataSourceName"; + public static final String JOB_ID = "jobId"; + public static final String ERROR = "error"; +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java new file mode 100644 index 0000000000..a963dc4092 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import java.io.IOException; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.execution.statestore.StateModel; + +/** Interface for XContentSerializer */ +public interface XContentSerializer { + + /** + * Serializes the given object to an XContentBuilder using the specified parameters. + * + * @param object The object to serialize. + * @param params The parameters to use for serialization. + * @return An XContentBuilder containing the serialized representation of the object. + * @throws IOException If an I/O error occurs during serialization. + */ + XContentBuilder toXContent(T object, ToXContent.Params params) throws IOException; + + /** + * Deserializes an object from an XContentParser. + * + * @param parser The XContentParser to read the object from. + * @param seqNo The sequence number associated with the object. + * @param primaryTerm The primary term associated with the object. + * @return The deserialized object. + */ + T fromXContent(XContentParser parser, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java index bb73f439a2..9c03b084db 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java @@ -5,20 +5,9 @@ package org.opensearch.sql.spark.flint; -import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; -import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; -import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; - -import java.io.IOException; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.sql.spark.execution.statestore.StateModel; /** Flint Index Model maintain the index state. */ @@ -26,10 +15,6 @@ @Builder @EqualsAndHashCode(callSuper = false) public class FlintIndexStateModel extends StateModel { - public static final String FLINT_INDEX_DOC_TYPE = "flintindexstate"; - public static final String LATEST_ID = "latestId"; - public static final String DOC_ID_PREFIX = "flint"; - private final FlintIndexState indexState; private final String applicationId; private final String jobId; @@ -89,62 +74,8 @@ public static FlintIndexStateModel copyWithState( primaryTerm); } - @SneakyThrows - public static FlintIndexStateModel fromXContent( - XContentParser parser, long seqNo, long primaryTerm) { - FlintIndexStateModelBuilder builder = FlintIndexStateModel.builder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case STATE: - builder.indexState(FlintIndexState.fromString(parser.text())); - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LATEST_ID: - builder.latestId(parser.text()); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case LAST_UPDATE_TIME: - builder.lastUpdateTime(parser.longValue()); - break; - case ERROR: - builder.error(parser.text()); - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - @Override public String getId() { return latestId; } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, VERSION_1_0) - .field(TYPE, FLINT_INDEX_DOC_TYPE) - .field(STATE, indexState.getState()) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LATEST_ID, latestId) - .field(DATASOURCE_NAME, datasourceName) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java index 2db3930821..58dc5166db 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java @@ -9,10 +9,12 @@ import lombok.RequiredArgsConstructor; import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; @RequiredArgsConstructor public class OpenSearchFlintIndexStateModelService implements FlintIndexStateModelService { private final StateStore stateStore; + private final FlintIndexStateModelXContentSerializer serializer; @Override public FlintIndexStateModel updateFlintIndexState( @@ -29,9 +31,7 @@ public FlintIndexStateModel updateFlintIndexState( @Override public Optional getFlintIndexStateModel(String id, String datasourceName) { return stateStore.get( - id, - FlintIndexStateModel::fromXContent, - OpenSearchStateStoreUtil.getIndexName(datasourceName)); + id, serializer::fromXContent, OpenSearchStateStoreUtil.getIndexName(datasourceName)); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java b/spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java index 98527b6241..f3a9a198fb 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateAsyncQueryRequest.java @@ -6,7 +6,6 @@ package org.opensearch.sql.spark.rest.model; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_ID; import java.io.IOException; import lombok.Data; @@ -15,7 +14,6 @@ @Data public class CreateAsyncQueryRequest { - private String query; private String datasource; private LangType lang; @@ -53,7 +51,7 @@ public static CreateAsyncQueryRequest fromXContentParser(XContentParser parser) lang = LangType.fromString(langString); } else if (fieldName.equals("datasource")) { datasource = parser.textOrNull(); - } else if (fieldName.equals(SESSION_ID)) { + } else if (fieldName.equals("sessionId")) { sessionId = parser.textOrNull(); } else { throw new IllegalArgumentException("Unknown field: " + fieldName); diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 6a33e6d5b6..25f31dcc69 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -33,6 +33,9 @@ import org.opensearch.sql.spark.execution.statestore.SessionStorageService; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.execution.statestore.StatementStorageService; +import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; @@ -111,8 +114,9 @@ public FlintIndexOpFactory flintIndexOpFactory( } @Provides - public FlintIndexStateModelService flintIndexStateModelService(StateStore stateStore) { - return new OpenSearchFlintIndexStateModelService(stateStore); + public FlintIndexStateModelService flintIndexStateModelService( + StateStore stateStore, FlintIndexStateModelXContentSerializer serializer) { + return new OpenSearchFlintIndexStateModelService(stateStore, serializer); } @Provides @@ -132,13 +136,15 @@ public SessionManager sessionManager( } @Provides - public SessionStorageService sessionStorageService(StateStore stateStore) { - return new OpenSearchSessionStorageService(stateStore); + public SessionStorageService sessionStorageService( + StateStore stateStore, SessionModelXContentSerializer sessionModelXContentSerializer) { + return new OpenSearchSessionStorageService(stateStore, sessionModelXContentSerializer); } @Provides - public StatementStorageService statementStorageService(StateStore stateStore) { - return new OpenSearchStatementStorageService(stateStore); + public StatementStorageService statementStorageService( + StateStore stateStore, StatementModelXContentSerializer statementModelXContentSerializer) { + return new OpenSearchStatementStorageService(stateStore, statementModelXContentSerializer); } @Provides diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 4dce252513..7f9fc5545d 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -10,9 +10,9 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SESSION_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; -import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_DOC_TYPE; -import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; -import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; +import static org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer.SESSION_DOC_TYPE; +import static org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer.SESSION_ID; +import static org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer.STATEMENT_DOC_TYPE; import com.google.common.collect.ImmutableMap; import java.util.HashMap; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index a8ae5fcb1a..8ac5b92cd8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -66,6 +66,9 @@ import org.opensearch.sql.spark.execution.statestore.SessionStorageService; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.execution.statestore.StatementStorageService; +import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; @@ -162,9 +165,13 @@ public void setup() { createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings()); createIndexWithMappings(otherDm.getResultIndex(), loadResultIndexMappings()); flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); - flintIndexStateModelService = new OpenSearchFlintIndexStateModelService(stateStore); - sessionStorageService = new OpenSearchSessionStorageService(stateStore); - statementStorageService = new OpenSearchStatementStorageService(stateStore); + flintIndexStateModelService = + new OpenSearchFlintIndexStateModelService( + stateStore, new FlintIndexStateModelXContentSerializer()); + sessionStorageService = + new OpenSearchSessionStorageService(stateStore, new SessionModelXContentSerializer()); + statementStorageService = + new OpenSearchStatementStorageService(stateStore, new StatementModelXContentSerializer()); } protected FlintIndexOpFactory getFlintIndexOpFactory( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java index c9660c8d87..14bb225c96 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -5,7 +5,6 @@ package org.opensearch.sql.spark.asyncquery; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; import static org.opensearch.sql.spark.flint.FlintIndexState.ACTIVE; import static org.opensearch.sql.spark.flint.FlintIndexState.CREATING; import static org.opensearch.sql.spark.flint.FlintIndexState.DELETED; @@ -31,6 +30,7 @@ import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; @@ -187,13 +187,15 @@ private boolean flintIndexExists(String flintIndexName) { private boolean indexDocExists(String docId) { return client - .get(new GetRequest(DATASOURCE_TO_REQUEST_INDEX.apply("mys3"), docId)) + .get(new GetRequest(OpenSearchStateStoreUtil.getIndexName("mys3"), docId)) .actionGet() .isExists(); } private void deleteIndexDoc(String docId) { - client.delete(new DeleteRequest(DATASOURCE_TO_REQUEST_INDEX.apply("mys3"), docId)).actionGet(); + client + .delete(new DeleteRequest(OpenSearchStateStoreUtil.getIndexName("mys3"), docId)) + .actionGet(); } private FlintDatasetMock mockDataset(String query, FlintIndexType indexType, String indexName) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index 8aac451f82..a2cf202c1f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -28,6 +28,8 @@ import org.opensearch.sql.spark.execution.statestore.SessionStorageService; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.execution.statestore.StatementStorageService; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; import org.opensearch.test.OpenSearchIntegTestCase; /** mock-maker-inline does not work with OpenSearchTestCase. */ @@ -47,8 +49,10 @@ public void setup() { emrsClient = new TestEMRServerlessClient(); startJobRequest = new StartJobRequest("", "appId", "", "", new HashMap<>(), false, ""); StateStore stateStore = new StateStore(client(), clusterService()); - sessionStorageService = new OpenSearchSessionStorageService(stateStore); - statementStorageService = new OpenSearchStatementStorageService(stateStore); + sessionStorageService = + new OpenSearchSessionStorageService(stateStore, new SessionModelXContentSerializer()); + statementStorageService = + new OpenSearchStatementStorageService(stateStore, new StatementModelXContentSerializer()); EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; sessionManager = new SessionManager( diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java index 5f05eed9b9..b18ec05497 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java @@ -12,7 +12,6 @@ import static org.opensearch.sql.spark.execution.statement.StatementState.RUNNING; import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING; import static org.opensearch.sql.spark.execution.statement.StatementTest.TestStatement.testStatement; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; import java.util.Optional; import lombok.RequiredArgsConstructor; @@ -29,15 +28,19 @@ import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.session.TestEMRServerlessClient; import org.opensearch.sql.spark.execution.statestore.OpenSearchSessionStorageService; +import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; import org.opensearch.sql.spark.execution.statestore.OpenSearchStatementStorageService; import org.opensearch.sql.spark.execution.statestore.SessionStorageService; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.execution.statestore.StatementStorageService; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; import org.opensearch.sql.spark.rest.model.LangType; import org.opensearch.test.OpenSearchIntegTestCase; public class StatementTest extends OpenSearchIntegTestCase { - private static final String indexName = DATASOURCE_TO_REQUEST_INDEX.apply(TEST_DATASOURCE_NAME); + private static final String indexName = + OpenSearchStateStoreUtil.getIndexName(TEST_DATASOURCE_NAME); private StatementStorageService statementStorageService; private SessionStorageService sessionStorageService; @@ -48,8 +51,10 @@ public class StatementTest extends OpenSearchIntegTestCase { @Before public void setup() { StateStore stateStore = new StateStore(client(), clusterService()); - statementStorageService = new OpenSearchStatementStorageService(stateStore); - sessionStorageService = new OpenSearchSessionStorageService(stateStore); + statementStorageService = + new OpenSearchStatementStorageService(stateStore, new StatementModelXContentSerializer()); + sessionStorageService = + new OpenSearchSessionStorageService(stateStore, new SessionModelXContentSerializer()); EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; sessionManager = diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java new file mode 100644 index 0000000000..d393c383c6 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java @@ -0,0 +1,184 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.dispatcher.model.JobType; + +class AsyncQueryJobMetadataXContentSerializerTest { + + private final AsyncQueryJobMetadataXContentSerializer serializer = + new AsyncQueryJobMetadataXContentSerializer(); + + @Test + void toXContentShouldSerializeAsyncQueryJobMetadata() throws Exception { + AsyncQueryJobMetadata jobMetadata = + new AsyncQueryJobMetadata( + new AsyncQueryId("query1"), + "app1", + "job1", + "result1", + "session1", + "datasource1", + JobType.INTERACTIVE, + "index1", + 1L, + 1L); + + XContentBuilder xContentBuilder = serializer.toXContent(jobMetadata, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertEquals(true, json.contains("\"queryId\":\"query1\"")); + assertEquals(true, json.contains("\"type\":\"jobmeta\"")); + assertEquals(true, json.contains("\"jobId\":\"job1\"")); + assertEquals(true, json.contains("\"applicationId\":\"app1\"")); + assertEquals(true, json.contains("\"resultIndex\":\"result1\"")); + assertEquals(true, json.contains("\"sessionId\":\"session1\"")); + assertEquals(true, json.contains("\"dataSourceName\":\"datasource1\"")); + assertEquals(true, json.contains("\"jobType\":\"interactive\"")); + assertEquals(true, json.contains("\"indexName\":\"index1\"")); + } + + @Test + void fromXContentShouldDeserializeAsyncQueryJobMetadata() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"jobmeta\",\n" + + " \"jobId\": \"job1\",\n" + + " \"applicationId\": \"app1\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"interactive\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + AsyncQueryJobMetadata jobMetadata = serializer.fromXContent(parser, 1L, 1L); + + assertEquals("query1", jobMetadata.getQueryId().getId()); + assertEquals("job1", jobMetadata.getJobId()); + assertEquals("app1", jobMetadata.getApplicationId()); + assertEquals("result1", jobMetadata.getResultIndex()); + assertEquals("session1", jobMetadata.getSessionId()); + assertEquals("datasource1", jobMetadata.getDatasourceName()); + assertEquals(JobType.INTERACTIVE, jobMetadata.getJobType()); + assertEquals("index1", jobMetadata.getIndexName()); + } + + @Test + void fromXContentShouldThrowExceptionWhenMissingRequiredFields() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"asyncqueryjobmeta\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"async_query\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentShouldDeserializeWithMissingApplicationId() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"jobmeta\",\n" + + " \"jobId\": \"job1\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"interactive\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentShouldThrowExceptionWhenUnknownFields() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"asyncqueryjobmeta\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"async_query\",\n" + + " \"indexame\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentShouldDeserializeAsyncQueryWithJobTypeNUll() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"jobmeta\",\n" + + " \"jobId\": \"job1\",\n" + + " \"applicationId\": \"app1\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + AsyncQueryJobMetadata jobMetadata = serializer.fromXContent(parser, 1L, 1L); + + assertEquals("query1", jobMetadata.getQueryId().getId()); + assertEquals("job1", jobMetadata.getJobId()); + assertEquals("app1", jobMetadata.getApplicationId()); + assertEquals("result1", jobMetadata.getResultIndex()); + assertEquals("session1", jobMetadata.getSessionId()); + assertEquals("datasource1", jobMetadata.getDatasourceName()); + assertNull(jobMetadata.getJobType()); + assertEquals("index1", jobMetadata.getIndexName()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializerTest.java new file mode 100644 index 0000000000..974aa09d9a --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializerTest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +@ExtendWith(MockitoExtension.class) +class FlintIndexStateModelXContentSerializerTest { + + private FlintIndexStateModelXContentSerializer serializer = + new FlintIndexStateModelXContentSerializer(); + + @Test + void toXContentShouldSerializeFlintIndexStateModel() throws Exception { + FlintIndexStateModel flintIndexStateModel = + FlintIndexStateModel.builder() + .indexState(FlintIndexState.ACTIVE) + .applicationId("app1") + .jobId("job1") + .latestId("latest1") + .datasourceName("datasource1") + .lastUpdateTime(System.currentTimeMillis()) + .error(null) + .build(); + + XContentBuilder xContentBuilder = + serializer.toXContent(flintIndexStateModel, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertEquals(true, json.contains("\"version\":\"1.0\"")); + assertEquals(true, json.contains("\"type\":\"flintindexstate\"")); + assertEquals(true, json.contains("\"state\":\"active\"")); + assertEquals(true, json.contains("\"applicationId\":\"app1\"")); + assertEquals(true, json.contains("\"jobId\":\"job1\"")); + assertEquals(true, json.contains("\"latestId\":\"latest1\"")); + assertEquals(true, json.contains("\"dataSourceName\":\"datasource1\"")); + } + + @Test + void fromXContentShouldDeserializeFlintIndexStateModel() throws Exception { + // Given + String json = + "{\"version\":\"1.0\",\"type\":\"flintindexstate\",\"state\":\"active\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"latestId\":\"latest1\",\"dataSourceName\":\"datasource1\",\"lastUpdateTime\":1623456789,\"error\":\"\"}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + FlintIndexStateModel flintIndexStateModel = serializer.fromXContent(parser, 1L, 1L); + + assertEquals(FlintIndexState.ACTIVE, flintIndexStateModel.getIndexState()); + assertEquals("app1", flintIndexStateModel.getApplicationId()); + assertEquals("job1", flintIndexStateModel.getJobId()); + assertEquals("latest1", flintIndexStateModel.getLatestId()); + assertEquals("datasource1", flintIndexStateModel.getDatasourceName()); + } + + @Test + void fromXContentThrowsExceptionWhenParsingInvalidContent() { + XContentParser parser = mock(XContentParser.class); + + assertThrows(RuntimeException.class, () -> serializer.fromXContent(parser, 0, 0)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializerTest.java new file mode 100644 index 0000000000..11a22eba57 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializerTest.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; + +class IndexDMLResultXContentSerializerTest { + + private final IndexDMLResultXContentSerializer serializer = + new IndexDMLResultXContentSerializer(); + + @Test + void toXContentShouldSerializeIndexDMLResult() throws IOException { + IndexDMLResult dmlResult = + new IndexDMLResult("query1", "SUCCESS", null, "datasource1", 1000L, 2000L); + + XContentBuilder xContentBuilder = serializer.toXContent(dmlResult, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertTrue(json.contains("\"queryId\":\"query1\"")); + assertTrue(json.contains("\"status\":\"SUCCESS\"")); + assertTrue(json.contains("\"error\":null")); + assertTrue(json.contains("\"dataSourceName\":\"datasource1\"")); + assertTrue(json.contains("\"queryRunTime\":1000")); + assertTrue(json.contains("\"updateTime\":2000")); + assertTrue(json.contains("\"result\":[]")); + assertTrue(json.contains("\"schema\":[]")); + } + + @Test + void toXContentShouldHandleErrorInIndexDMLResult() throws IOException { + // Given + IndexDMLResult dmlResult = + new IndexDMLResult("query1", "FAILURE", "An error occurred", "datasource1", 1000L, 2000L); + + // When + XContentBuilder xContentBuilder = serializer.toXContent(dmlResult, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + // Then + assertTrue(json.contains("\"queryId\":\"query1\"")); + assertTrue(json.contains("\"status\":\"FAILURE\"")); + assertTrue(json.contains("\"error\":\"An error occurred\"")); + assertTrue(json.contains("\"dataSourceName\":\"datasource1\"")); + assertTrue(json.contains("\"queryRunTime\":1000")); + assertTrue(json.contains("\"updateTime\":2000")); + assertTrue(json.contains("\"result\":[]")); + assertTrue(json.contains("\"schema\":[]")); + } + + @Test + void fromXContentShouldThrowUnsupportedOperationException() { + // When/Then + assertThrows(UnsupportedOperationException.class, () -> serializer.fromXContent(null, 0L, 0L)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializerTest.java new file mode 100644 index 0000000000..cd560fa4ca --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializerTest.java @@ -0,0 +1,100 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionModel; +import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.session.SessionType; + +class SessionModelXContentSerializerTest { + + private final SessionModelXContentSerializer serializer = new SessionModelXContentSerializer(); + + @Test + void toXContentShouldSerializeSessionModel() throws Exception { + // Given + SessionModel sessionModel = + SessionModel.builder() + .version("1.0") + .sessionType(SessionType.INTERACTIVE) + .sessionId(new SessionId("session1")) + .sessionState(SessionState.FAIL) + .datasourceName("datasource1") + .applicationId("app1") + .jobId("job1") + .lastUpdateTime(System.currentTimeMillis()) + .error(null) + .build(); + + // When + XContentBuilder xContentBuilder = serializer.toXContent(sessionModel, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + // Then + assertEquals(true, json.contains("\"version\":\"1.0\"")); + assertEquals(true, json.contains("\"type\":\"session\"")); + assertEquals(true, json.contains("\"sessionType\":\"interactive\"")); + assertEquals(true, json.contains("\"sessionId\":\"session1\"")); + assertEquals(true, json.contains("\"state\":\"fail\"")); + assertEquals(true, json.contains("\"dataSourceName\":\"datasource1\"")); + assertEquals(true, json.contains("\"applicationId\":\"app1\"")); + assertEquals(true, json.contains("\"jobId\":\"job1\"")); + } + + @Test + void fromXContentShouldDeserializeSessionModel() throws Exception { + // Given + String json = + "{\n" + + " \"version\": \"1.0\",\n" + + " \"type\": \"session\",\n" + + " \"sessionType\": \"interactive\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"state\": \"fail\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"applicationId\": \"app1\",\n" + + " \"jobId\": \"job1\",\n" + + " \"lastUpdateTime\": 1623456789,\n" + + " \"error\": \"\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + // When + SessionModel sessionModel = serializer.fromXContent(parser, 1L, 1L); + + // Then + assertEquals("1.0", sessionModel.getVersion()); + assertEquals(SessionType.INTERACTIVE, sessionModel.getSessionType()); + assertEquals("session1", sessionModel.getSessionId().getSessionId()); + assertEquals(SessionState.FAIL, sessionModel.getSessionState()); + assertEquals("datasource1", sessionModel.getDatasourceName()); + assertEquals("app1", sessionModel.getApplicationId()); + assertEquals("job1", sessionModel.getJobId()); + } + + @Test + void fromXContentThrowsExceptionWhenParsingInvalidContent() { + XContentParser parser = mock(XContentParser.class); + + assertThrows(RuntimeException.class, () -> serializer.fromXContent(parser, 0, 0)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializerTest.java new file mode 100644 index 0000000000..de355e1c99 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializerTest.java @@ -0,0 +1,128 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.statement.StatementId; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.rest.model.LangType; + +@ExtendWith(MockitoExtension.class) +class StatementModelXContentSerializerTest { + + private StatementModelXContentSerializer serializer; + + @Test + void toXContentShouldSerializeStatementModel() throws Exception { + + serializer = new StatementModelXContentSerializer(); + // Given + StatementModel statementModel = + StatementModel.builder() + .version("1.0") + .statementState(StatementState.RUNNING) + .statementId(new StatementId("statement1")) + .sessionId(new SessionId("session1")) + .applicationId("app1") + .jobId("job1") + .langType(LangType.SQL) + .datasourceName("datasource1") + .query("SELECT * FROM table") + .queryId("query1") + .submitTime(System.currentTimeMillis()) + .error(null) + .build(); + + // When + XContentBuilder xContentBuilder = + serializer.toXContent(statementModel, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertEquals(true, json.contains("\"version\":\"1.0\"")); + assertEquals(true, json.contains("\"state\":\"running\"")); + assertEquals(true, json.contains("\"statementId\":\"statement1\"")); + } + + @Test + void fromXContentShouldDeserializeStatementModel() throws Exception { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); + // Given + String json = + "{\"version\":\"1.0\",\"type\":\"statement\",\"state\":\"running\",\"statementId\":\"statement1\",\"sessionId\":\"session1\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"lang\":\"SQL\",\"dataSourceName\":\"datasource1\",\"query\":\"SELECT" + + " * FROM table\",\"queryId\":\"query1\",\"submitTime\":1623456789,\"error\":\"\"}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + StatementModel statementModel = serializer.fromXContent(parser, 1L, 1L); + + assertEquals("1.0", statementModel.getVersion()); + assertEquals(StatementState.RUNNING, statementModel.getStatementState()); + assertEquals("statement1", statementModel.getStatementId().getId()); + assertEquals("session1", statementModel.getSessionId().getSessionId()); + } + + @Test + void fromXContentShouldDeserializeStatementModelThrowException() throws Exception { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); + // Given + String json = + "{\"version\":\"1.0\",\"type\":\"statement_state\",\"state\":\"running\",\"statementId\":\"statement1\",\"sessionId\":\"session1\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"lang\":\"SQL\",\"dataSourceName\":\"datasource1\",\"query\":\"SELECT" + + " * FROM table\",\"queryId\":\"query1\",\"submitTime\":1623456789,\"error\":null}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalStateException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentThrowsExceptionWhenParsingInvalidContent() { + XContentParser parser = mock(XContentParser.class); + + assertThrows(RuntimeException.class, () -> serializer.fromXContent(parser, 0, 0)); + } + + @Test + void fromXContentShouldThrowExceptionForUnexpectedField() throws Exception { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); + + String jsonWithUnexpectedField = + "{\"version\":\"1.0\",\"type\":\"statement\",\"state\":\"running\",\"statementId\":\"statement1\",\"sessionId\":\"session1\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"lang\":\"SQL\",\"dataSourceName\":\"datasource1\",\"query\":\"SELECT" + + " * FROM" + + " table\",\"queryId\":\"query1\",\"submitTime\":1623456789,\"error\":\"\",\"unexpectedField\":\"someValue\"}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + jsonWithUnexpectedField); + parser.nextToken(); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + assertEquals("Unexpected field: unexpectedField", exception.getMessage()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java index aebc136b93..5ec5a96073 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java @@ -17,6 +17,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; @ExtendWith(MockitoExtension.class) public class OpenSearchFlintIndexStateModelServiceTest { @@ -28,6 +29,7 @@ public class OpenSearchFlintIndexStateModelServiceTest { @Mock FlintIndexStateModel flintIndexStateModel; @Mock FlintIndexState flintIndexState; @Mock FlintIndexStateModel responseFlintIndexStateModel; + @Mock FlintIndexStateModelXContentSerializer flintIndexStateModelXContentSerializer; @InjectMocks OpenSearchFlintIndexStateModelService openSearchFlintIndexStateModelService;