diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index 1c7fd35c5e..cbb5779699 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -7,19 +7,9 @@ package org.opensearch.sql.spark.asyncquery.model; -import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; - import com.google.gson.Gson; -import java.io.IOException; -import java.util.Locale; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.SneakyThrows; -import org.opensearch.core.common.Strings; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -134,29 +124,6 @@ public String toString() { return new Gson().toJson(this); } - /** - * Converts JobMetadata to XContentBuilder. - * - * @return XContentBuilder {@link XContentBuilder} - * @throws Exception Exception. - */ - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(QUERY_ID, queryId.getId()) - .field("type", TYPE_JOBMETA) - .field("jobId", jobId) - .field("applicationId", applicationId) - .field("resultIndex", resultIndex) - .field("sessionId", sessionId) - .field(DATASOURCE_NAME, datasourceName) - .field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT)) - .field(INDEX_NAME, indexName) - .endObject(); - return builder; - } - /** copy builder. update seqNo and primaryTerm */ public static AsyncQueryJobMetadata copy( AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) { @@ -173,72 +140,6 @@ public static AsyncQueryJobMetadata copy( primaryTerm); } - /** - * Convert xcontent parser to JobMetadata. - * - * @param parser parser. - * @return JobMetadata {@link AsyncQueryJobMetadata} - * @throws IOException IOException. - */ - @SneakyThrows - public static AsyncQueryJobMetadata fromXContent( - XContentParser parser, long seqNo, long primaryTerm) { - AsyncQueryId queryId = null; - String jobId = null; - String applicationId = null; - String resultIndex = null; - String sessionId = null; - String datasourceName = null; - String jobTypeStr = null; - String indexName = null; - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case QUERY_ID: - queryId = new AsyncQueryId(parser.textOrNull()); - break; - case "jobId": - jobId = parser.textOrNull(); - break; - case "applicationId": - applicationId = parser.textOrNull(); - break; - case "resultIndex": - resultIndex = parser.textOrNull(); - break; - case "sessionId": - sessionId = parser.textOrNull(); - break; - case DATASOURCE_NAME: - datasourceName = parser.textOrNull(); - case JOB_TYPE: - jobTypeStr = parser.textOrNull(); - case INDEX_NAME: - indexName = parser.textOrNull(); - case "type": - break; - default: - throw new IllegalArgumentException("Unknown field: " + fieldName); - } - } - if (jobId == null || applicationId == null) { - throw new IllegalArgumentException("jobId and applicationId are required fields."); - } - return new AsyncQueryJobMetadata( - queryId, - applicationId, - jobId, - resultIndex, - sessionId, - datasourceName, - Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr), - indexName, - seqNo, - primaryTerm); - } - @Override public String getId() { return queryId.docId(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java index b01ecf55ba..8fe0eecc9d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java @@ -5,13 +5,8 @@ package org.opensearch.sql.spark.dispatcher.model; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; - -import com.google.common.collect.ImmutableList; -import java.io.IOException; import lombok.Data; import lombok.EqualsAndHashCode; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -19,10 +14,10 @@ @Data @EqualsAndHashCode(callSuper = false) public class IndexDMLResult extends StateModel { - private static final String QUERY_ID = "queryId"; - private static final String QUERY_RUNTIME = "queryRunTime"; - private static final String UPDATE_TIME = "updateTime"; - private static final String DOC_ID_PREFIX = "index"; + public static final String QUERY_ID = "queryId"; + public static final String QUERY_RUNTIME = "queryRunTime"; + public static final String UPDATE_TIME = "updateTime"; + public static final String DOC_ID_PREFIX = "index"; private final String queryId; private final String status; @@ -55,20 +50,4 @@ public long getSeqNo() { public long getPrimaryTerm() { return SequenceNumbers.UNASSIGNED_PRIMARY_TERM; } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(QUERY_ID, queryId) - .field("status", status) - .field("error", error) - .field(DATASOURCE_NAME, datasourceName) - .field(QUERY_RUNTIME, queryRunTime) - .field(UPDATE_TIME, updateTime) - .field("result", ImmutableList.of()) - .field("schema", ImmutableList.of()) - .endObject(); - return builder; - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java index 806cdb083e..db78ddbd9d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java @@ -8,13 +8,8 @@ import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE; -import java.io.IOException; import lombok.Builder; import lombok.Data; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -48,24 +43,6 @@ public class SessionModel extends StateModel { private final long seqNo; private final long primaryTerm; - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, version) - .field(TYPE, SESSION_DOC_TYPE) - .field(SESSION_TYPE, sessionType.getSessionType()) - .field(SESSION_ID, sessionId.getSessionId()) - .field(SESSION_STATE, sessionState.getSessionState()) - .field(DATASOURCE_NAME, datasourceName) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } - public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) { return builder() .version(copy.version) @@ -99,52 +76,6 @@ public static SessionModel copyWithState( .build(); } - @SneakyThrows - public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { - SessionModelBuilder builder = new SessionModelBuilder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case VERSION: - builder.version(parser.text()); - break; - case SESSION_TYPE: - builder.sessionType(SessionType.fromString(parser.text())); - break; - case SESSION_ID: - builder.sessionId(new SessionId(parser.text())); - break; - case SESSION_STATE: - builder.sessionState(SessionState.fromString(parser.text())); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case ERROR: - builder.error(parser.text()); - break; - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LAST_UPDATE_TIME: - builder.lastUpdateTime(parser.longValue()); - break; - case TYPE: - // do nothing. - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - public static SessionModel initInteractiveSession( String applicationId, String jobId, SessionId sid, String datasourceName) { return builder() diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java index adc147c905..a0eae1a4b9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java @@ -5,18 +5,10 @@ package org.opensearch.sql.spark.execution.statement; -import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING; -import java.io.IOException; import lombok.Builder; import lombok.Data; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -55,27 +47,6 @@ public class StatementModel extends StateModel { private final long seqNo; private final long primaryTerm; - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, version) - .field(TYPE, STATEMENT_DOC_TYPE) - .field(STATEMENT_STATE, statementState.getState()) - .field(STATEMENT_ID, statementId.getId()) - .field(SESSION_ID, sessionId.getSessionId()) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LANG, langType.getText()) - .field(DATASOURCE_NAME, datasourceName) - .field(QUERY, query) - .field(QUERY_ID, queryId) - .field(SUBMIT_TIME, submitTime) - .field(ERROR, error) - .endObject(); - return builder; - } - public static StatementModel copy(StatementModel copy, long seqNo, long primaryTerm) { return builder() .version("1.0") @@ -115,61 +86,6 @@ public static StatementModel copyWithState( .build(); } - @SneakyThrows - public static StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { - StatementModel.StatementModelBuilder builder = StatementModel.builder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case VERSION: - builder.version(parser.text()); - break; - case TYPE: - // do nothing - break; - case STATEMENT_STATE: - builder.statementState(StatementState.fromString(parser.text())); - break; - case STATEMENT_ID: - builder.statementId(new StatementId(parser.text())); - break; - case SESSION_ID: - builder.sessionId(new SessionId(parser.text())); - break; - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LANG: - builder.langType(LangType.fromString(parser.text())); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case QUERY: - builder.query(parser.text()); - break; - case QUERY_ID: - builder.queryId(parser.text()); - break; - case SUBMIT_TIME: - builder.submitTime(parser.longValue()); - break; - case ERROR: - builder.error(parser.text()); - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - public static StatementModel submitStatement( SessionId sid, String applicationId, diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java new file mode 100644 index 0000000000..6ae980086d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java @@ -0,0 +1,5 @@ +package org.opensearch.sql.spark.execution.statestore; + +public interface CopyBuilder { + T of(T copy, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java new file mode 100644 index 0000000000..56bc4b6bbc --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java @@ -0,0 +1,7 @@ +package org.opensearch.sql.spark.execution.statestore; + +import org.opensearch.core.xcontent.XContentParser; + +public interface FromXContent { + T fromXContent(XContentParser parser, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java new file mode 100644 index 0000000000..ff90f02115 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java @@ -0,0 +1,5 @@ +package org.opensearch.sql.spark.execution.statestore; + +public interface StateCopyBuilder { + T of(T copy, S state, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java index fe105cc8e4..71ee27bd91 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java @@ -5,10 +5,7 @@ package org.opensearch.sql.spark.execution.statestore; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentParser; - -public abstract class StateModel implements ToXContentObject { +public abstract class StateModel { public static final String VERSION_1_0 = "1.0"; public static final String TYPE = "type"; public static final String STATE = "state"; @@ -19,16 +16,4 @@ public abstract class StateModel implements ToXContentObject { public abstract long getSeqNo(); public abstract long getPrimaryTerm(); - - public interface CopyBuilder { - T of(T copy, long seqNo, long primaryTerm); - } - - public interface StateCopyBuilder { - T of(T copy, S state, long seqNo, long primaryTerm); - } - - public interface FromXContent { - T fromXContent(XContentParser parser, long seqNo, long primaryTerm); - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index e50a2837d9..bad44905ae 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -40,7 +40,6 @@ import org.opensearch.common.action.ActionFuture; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; @@ -55,6 +54,12 @@ import org.opensearch.sql.spark.execution.session.SessionType; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.IndexDMLResultXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.XContentSerializer; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; @@ -78,16 +83,16 @@ public class StateStore { private final ClusterService clusterService; @VisibleForTesting - public T create( - T st, StateModel.CopyBuilder builder, String indexName) { + public T create(T st, CopyBuilder builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { createIndex(indexName); } + XContentSerializer serializer = getXContentSerializer(st); IndexRequest indexRequest = new IndexRequest(indexName) .id(st.getId()) - .source(st.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .source(serializer.toXContent(st, ToXContent.EMPTY_PARAMS)) .setIfSeqNo(st.getSeqNo()) .setIfPrimaryTerm(st.getPrimaryTerm()) .create(true) @@ -114,7 +119,7 @@ public T create( @VisibleForTesting public Optional get( - String sid, StateModel.FromXContent builder, String indexName) { + String sid, FromXContent builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { createIndex(indexName); @@ -146,16 +151,17 @@ public Optional get( @VisibleForTesting public T updateState( - T st, S state, StateModel.StateCopyBuilder builder, String indexName) { + T st, S state, StateCopyBuilder builder, String indexName) { try { T model = builder.of(st, state, st.getSeqNo(), st.getPrimaryTerm()); + XContentSerializer serializer = getXContentSerializer(st); UpdateRequest updateRequest = new UpdateRequest() .index(indexName) .id(model.getId()) .setIfSeqNo(model.getSeqNo()) .setIfPrimaryTerm(model.getPrimaryTerm()) - .doc(model.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .doc(serializer.toXContent(model, ToXContent.EMPTY_PARAMS)) .fetchSource(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); try (ThreadContext.StoredContext ignored = @@ -260,9 +266,10 @@ public static Function createStatement( public static Function> getStatement( StateStore stateStore, String datasourceName) { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); return (docId) -> stateStore.get( - docId, StatementModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + docId, serializer::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } public static BiFunction updateStatementState( @@ -284,9 +291,10 @@ public static Function createSession( public static Function> getSession( StateStore stateStore, String datasourceName) { + SessionModelXContentSerializer serializer = new SessionModelXContentSerializer(); return (docId) -> stateStore.get( - docId, SessionModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + docId, serializer::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } public static BiFunction updateSessionState( @@ -310,10 +318,12 @@ public static Function createJobMe public static Function> getJobMetaData( StateStore stateStore, String datasourceName) { + AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer = + new AsyncQueryJobMetadataXContentSerializer(); return (docId) -> stateStore.get( docId, - AsyncQueryJobMetadata::fromXContent, + asyncQueryJobMetadataXContentSerializer::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } @@ -396,4 +406,22 @@ public static Supplier activeStatementsCount(StateStore stateStore, String StatementState.RUNNING.getState(), StatementState.WAITING.getState()))); } + + @SuppressWarnings("unchecked") + private XContentSerializer getXContentSerializer(T st) { + if (st instanceof StatementModel) { + return (XContentSerializer) new StatementModelXContentSerializer(); + } else if (st instanceof SessionModel) { + return (XContentSerializer) new SessionModelXContentSerializer(); + } else if (st instanceof FlintIndexStateModel) { + return (XContentSerializer) new FlintIndexStateModelXContentSerializer(); + } else if (st instanceof AsyncQueryJobMetadata) { + return (XContentSerializer) new AsyncQueryJobMetadataXContentSerializer(); + } else if (st instanceof IndexDMLResult) { + return (XContentSerializer) new IndexDMLResultXContentSerializer(); + } else { + throw new IllegalArgumentException( + "Unsupported StateModel subclass: " + st.getClass().getSimpleName()); + } + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java new file mode 100644 index 0000000000..4b9790ca96 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata.INDEX_NAME; +import static org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata.JOB_TYPE; +import static org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata.TYPE_JOBMETA; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; + +import java.io.IOException; +import java.util.Locale; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.dispatcher.model.JobType; + +public class AsyncQueryJobMetadataXContentSerializer + implements XContentSerializer { + @Override + public XContentBuilder toXContent(AsyncQueryJobMetadata jobMetadata, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(QUERY_ID, jobMetadata.getQueryId().getId()) + .field("type", TYPE_JOBMETA) + .field("jobId", jobMetadata.getJobId()) + .field("applicationId", jobMetadata.getApplicationId()) + .field("resultIndex", jobMetadata.getResultIndex()) + .field("sessionId", jobMetadata.getSessionId()) + .field(DATASOURCE_NAME, jobMetadata.getDatasourceName()) + .field(JOB_TYPE, jobMetadata.getJobType().getText().toLowerCase(Locale.ROOT)) + .field(INDEX_NAME, jobMetadata.getIndexName()) + .endObject(); + } + + @Override + @SneakyThrows + public AsyncQueryJobMetadata fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + AsyncQueryId queryId = null; + String jobId = null; + String applicationId = null; + String resultIndex = null; + String sessionId = null; + String datasourceName = null; + String jobTypeStr = null; + String indexName = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case QUERY_ID: + queryId = new AsyncQueryId(parser.textOrNull()); + break; + case "jobId": + jobId = parser.textOrNull(); + break; + case "applicationId": + applicationId = parser.textOrNull(); + break; + case "resultIndex": + resultIndex = parser.textOrNull(); + break; + case "sessionId": + sessionId = parser.textOrNull(); + break; + case DATASOURCE_NAME: + datasourceName = parser.textOrNull(); + case JOB_TYPE: + jobTypeStr = parser.textOrNull(); + case INDEX_NAME: + indexName = parser.textOrNull(); + case "type": + break; + default: + throw new IllegalArgumentException("Unknown field: " + fieldName); + } + } + if (jobId == null || applicationId == null) { + throw new IllegalArgumentException("jobId and applicationId are required fields."); + } + return new AsyncQueryJobMetadata( + queryId, + applicationId, + jobId, + resultIndex, + sessionId, + datasourceName, + Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr), + indexName, + seqNo, + primaryTerm); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java new file mode 100644 index 0000000000..d96ee5e312 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; +import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; +import static org.opensearch.sql.spark.execution.statestore.StateModel.LAST_UPDATE_TIME; +import static org.opensearch.sql.spark.execution.statestore.StateModel.STATE; +import static org.opensearch.sql.spark.execution.statestore.StateModel.TYPE; +import static org.opensearch.sql.spark.execution.statestore.StateModel.VERSION_1_0; +import static org.opensearch.sql.spark.flint.FlintIndexStateModel.FLINT_INDEX_DOC_TYPE; +import static org.opensearch.sql.spark.flint.FlintIndexStateModel.LATEST_ID; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +public class FlintIndexStateModelXContentSerializer + implements XContentSerializer { + @Override + public XContentBuilder toXContent( + FlintIndexStateModel flintIndexStateModel, ToXContent.Params params) throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(VERSION, VERSION_1_0) + .field(TYPE, FLINT_INDEX_DOC_TYPE) + .field(STATE, flintIndexStateModel.getIndexState().getState()) + .field(APPLICATION_ID, flintIndexStateModel.getApplicationId()) + .field(JOB_ID, flintIndexStateModel.getJobId()) + .field(LATEST_ID, flintIndexStateModel.getLatestId()) + .field(DATASOURCE_NAME, flintIndexStateModel.getDatasourceName()) + .field(LAST_UPDATE_TIME, flintIndexStateModel.getLastUpdateTime()) + .field(ERROR, flintIndexStateModel.getError()) + .endObject(); + } + + @Override + @SneakyThrows + public FlintIndexStateModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + // Implement the fromXContent logic here + FlintIndexStateModel.FlintIndexStateModelBuilder builder = FlintIndexStateModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case STATE: + builder.indexState(FlintIndexState.fromString(parser.text())); + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LATEST_ID: + builder.latestId(parser.text()); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case ERROR: + builder.error(parser.text()); + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java new file mode 100644 index 0000000000..2e23a1fa82 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.dispatcher.model.IndexDMLResult.QUERY_RUNTIME; +import static org.opensearch.sql.spark.dispatcher.model.IndexDMLResult.UPDATE_TIME; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; + +public class IndexDMLResultXContentSerializer implements XContentSerializer { + @Override + public XContentBuilder toXContent(IndexDMLResult dmlResult, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(QUERY_ID, dmlResult.getQueryId()) + .field("status", dmlResult.getStatus()) + .field("error", dmlResult.getError()) + .field(DATASOURCE_NAME, dmlResult.getDatasourceName()) + .field(QUERY_RUNTIME, dmlResult.getQueryRunTime()) + .field(UPDATE_TIME, dmlResult.getUpdateTime()) + .field("result", ImmutableList.of()) + .field("schema", ImmutableList.of()) + .endObject(); + } + + @Override + public IndexDMLResult fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + throw new UnsupportedOperationException("IndexDMLResult to fromXContent Not supported"); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java new file mode 100644 index 0000000000..6a07461cb8 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.session.SessionModel.ERROR; +import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_DOC_TYPE; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_STATE; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_TYPE; +import static org.opensearch.sql.spark.execution.session.SessionModel.VERSION; +import static org.opensearch.sql.spark.execution.statestore.StateModel.LAST_UPDATE_TIME; +import static org.opensearch.sql.spark.execution.statestore.StateModel.TYPE; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionModel; +import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.session.SessionType; + +public class SessionModelXContentSerializer implements XContentSerializer { + @Override + public XContentBuilder toXContent(SessionModel sessionModel, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(VERSION, sessionModel.getVersion()) + .field(TYPE, SESSION_DOC_TYPE) + .field(SESSION_TYPE, sessionModel.getSessionType().getSessionType()) + .field(SESSION_ID, sessionModel.getSessionId().getSessionId()) + .field(SESSION_STATE, sessionModel.getSessionState().getSessionState()) + .field(DATASOURCE_NAME, sessionModel.getDatasourceName()) + .field(APPLICATION_ID, sessionModel.getApplicationId()) + .field(JOB_ID, sessionModel.getJobId()) + .field(LAST_UPDATE_TIME, sessionModel.getLastUpdateTime()) + .field(ERROR, sessionModel.getError()) + .endObject(); + } + + @Override + @SneakyThrows + public SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + // Implement the fromXContent logic here + SessionModel.SessionModelBuilder builder = SessionModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case VERSION: + builder.version(parser.text()); + break; + case SESSION_TYPE: + builder.sessionType(SessionType.fromString(parser.text())); + break; + case SESSION_ID: + builder.sessionId(new SessionId(parser.text())); + break; + case SESSION_STATE: + builder.sessionState(SessionState.fromString(parser.text())); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case ERROR: + builder.error(parser.text()); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case TYPE: + // do nothing. + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java new file mode 100644 index 0000000000..ad6214c4e0 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; +import static org.opensearch.sql.spark.execution.statement.StatementModel.LANG; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; +import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_STATE; +import static org.opensearch.sql.spark.execution.statement.StatementModel.SUBMIT_TIME; +import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; +import static org.opensearch.sql.spark.execution.statestore.StateModel.TYPE; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.statement.StatementId; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.rest.model.LangType; + +public class StatementModelXContentSerializer implements XContentSerializer { + @Override + public XContentBuilder toXContent(StatementModel statementModel, ToXContent.Params params) + throws IOException { + return XContentFactory.jsonBuilder() + .startObject() + .field(VERSION, statementModel.getVersion()) + .field(TYPE, STATEMENT_DOC_TYPE) + .field(STATEMENT_STATE, statementModel.getStatementState().getState()) + .field(STATEMENT_ID, statementModel.getStatementId().getId()) + .field(SESSION_ID, statementModel.getSessionId().getSessionId()) + .field(APPLICATION_ID, statementModel.getApplicationId()) + .field(JOB_ID, statementModel.getJobId()) + .field(LANG, statementModel.getLangType().getText()) + .field(DATASOURCE_NAME, statementModel.getDatasourceName()) + .field(QUERY, statementModel.getQuery()) + .field(QUERY_ID, statementModel.getQueryId()) + .field(SUBMIT_TIME, statementModel.getSubmitTime()) + .field(ERROR, statementModel.getError()) + .endObject(); + } + + @Override + @SneakyThrows + public StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + StatementModel.StatementModelBuilder builder = StatementModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case VERSION: + builder.version(parser.text()); + break; + case TYPE: + // do nothing + break; + case STATEMENT_STATE: + builder.statementState(StatementState.fromString(parser.text())); + break; + case STATEMENT_ID: + builder.statementId(new StatementId(parser.text())); + break; + case SESSION_ID: + builder.sessionId(new SessionId(parser.text())); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LANG: + builder.langType(LangType.fromString(parser.text())); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case QUERY: + builder.query(parser.text()); + break; + case QUERY_ID: + builder.queryId(parser.text()); + break; + case SUBMIT_TIME: + builder.submitTime(parser.longValue()); + break; + case ERROR: + builder.error(parser.text()); + break; + default: + throw new IllegalArgumentException("Unexpected field: " + fieldName); + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java new file mode 100644 index 0000000000..a963dc4092 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import java.io.IOException; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.execution.statestore.StateModel; + +/** Interface for XContentSerializer */ +public interface XContentSerializer { + + /** + * Serializes the given object to an XContentBuilder using the specified parameters. + * + * @param object The object to serialize. + * @param params The parameters to use for serialization. + * @return An XContentBuilder containing the serialized representation of the object. + * @throws IOException If an I/O error occurs during serialization. + */ + XContentBuilder toXContent(T object, ToXContent.Params params) throws IOException; + + /** + * Deserializes an object from an XContentParser. + * + * @param parser The XContentParser to read the object from. + * @param seqNo The sequence number associated with the object. + * @param primaryTerm The primary term associated with the object. + * @return The deserialized object. + */ + T fromXContent(XContentParser parser, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java index bb73f439a2..3a13805c0a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java @@ -9,14 +9,11 @@ import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; -import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; -import java.io.IOException; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -130,21 +127,4 @@ public static FlintIndexStateModel fromXContent( public String getId() { return latestId; } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, VERSION_1_0) - .field(TYPE, FLINT_INDEX_DOC_TYPE) - .field(STATE, indexState.getState()) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LATEST_ID, latestId) - .field(DATASOURCE_NAME, datasourceName) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java new file mode 100644 index 0000000000..d393c383c6 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializerTest.java @@ -0,0 +1,184 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.dispatcher.model.JobType; + +class AsyncQueryJobMetadataXContentSerializerTest { + + private final AsyncQueryJobMetadataXContentSerializer serializer = + new AsyncQueryJobMetadataXContentSerializer(); + + @Test + void toXContentShouldSerializeAsyncQueryJobMetadata() throws Exception { + AsyncQueryJobMetadata jobMetadata = + new AsyncQueryJobMetadata( + new AsyncQueryId("query1"), + "app1", + "job1", + "result1", + "session1", + "datasource1", + JobType.INTERACTIVE, + "index1", + 1L, + 1L); + + XContentBuilder xContentBuilder = serializer.toXContent(jobMetadata, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertEquals(true, json.contains("\"queryId\":\"query1\"")); + assertEquals(true, json.contains("\"type\":\"jobmeta\"")); + assertEquals(true, json.contains("\"jobId\":\"job1\"")); + assertEquals(true, json.contains("\"applicationId\":\"app1\"")); + assertEquals(true, json.contains("\"resultIndex\":\"result1\"")); + assertEquals(true, json.contains("\"sessionId\":\"session1\"")); + assertEquals(true, json.contains("\"dataSourceName\":\"datasource1\"")); + assertEquals(true, json.contains("\"jobType\":\"interactive\"")); + assertEquals(true, json.contains("\"indexName\":\"index1\"")); + } + + @Test + void fromXContentShouldDeserializeAsyncQueryJobMetadata() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"jobmeta\",\n" + + " \"jobId\": \"job1\",\n" + + " \"applicationId\": \"app1\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"interactive\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + AsyncQueryJobMetadata jobMetadata = serializer.fromXContent(parser, 1L, 1L); + + assertEquals("query1", jobMetadata.getQueryId().getId()); + assertEquals("job1", jobMetadata.getJobId()); + assertEquals("app1", jobMetadata.getApplicationId()); + assertEquals("result1", jobMetadata.getResultIndex()); + assertEquals("session1", jobMetadata.getSessionId()); + assertEquals("datasource1", jobMetadata.getDatasourceName()); + assertEquals(JobType.INTERACTIVE, jobMetadata.getJobType()); + assertEquals("index1", jobMetadata.getIndexName()); + } + + @Test + void fromXContentShouldThrowExceptionWhenMissingRequiredFields() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"asyncqueryjobmeta\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"async_query\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentShouldDeserializeWithMissingApplicationId() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"jobmeta\",\n" + + " \"jobId\": \"job1\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"interactive\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentShouldThrowExceptionWhenUnknownFields() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"asyncqueryjobmeta\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"async_query\",\n" + + " \"indexame\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentShouldDeserializeAsyncQueryWithJobTypeNUll() throws Exception { + String json = + "{\n" + + " \"queryId\": \"query1\",\n" + + " \"type\": \"jobmeta\",\n" + + " \"jobId\": \"job1\",\n" + + " \"applicationId\": \"app1\",\n" + + " \"resultIndex\": \"result1\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"jobType\": \"\",\n" + + " \"indexName\": \"index1\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + AsyncQueryJobMetadata jobMetadata = serializer.fromXContent(parser, 1L, 1L); + + assertEquals("query1", jobMetadata.getQueryId().getId()); + assertEquals("job1", jobMetadata.getJobId()); + assertEquals("app1", jobMetadata.getApplicationId()); + assertEquals("result1", jobMetadata.getResultIndex()); + assertEquals("session1", jobMetadata.getSessionId()); + assertEquals("datasource1", jobMetadata.getDatasourceName()); + assertNull(jobMetadata.getJobType()); + assertEquals("index1", jobMetadata.getIndexName()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializerTest.java new file mode 100644 index 0000000000..974aa09d9a --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializerTest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +@ExtendWith(MockitoExtension.class) +class FlintIndexStateModelXContentSerializerTest { + + private FlintIndexStateModelXContentSerializer serializer = + new FlintIndexStateModelXContentSerializer(); + + @Test + void toXContentShouldSerializeFlintIndexStateModel() throws Exception { + FlintIndexStateModel flintIndexStateModel = + FlintIndexStateModel.builder() + .indexState(FlintIndexState.ACTIVE) + .applicationId("app1") + .jobId("job1") + .latestId("latest1") + .datasourceName("datasource1") + .lastUpdateTime(System.currentTimeMillis()) + .error(null) + .build(); + + XContentBuilder xContentBuilder = + serializer.toXContent(flintIndexStateModel, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertEquals(true, json.contains("\"version\":\"1.0\"")); + assertEquals(true, json.contains("\"type\":\"flintindexstate\"")); + assertEquals(true, json.contains("\"state\":\"active\"")); + assertEquals(true, json.contains("\"applicationId\":\"app1\"")); + assertEquals(true, json.contains("\"jobId\":\"job1\"")); + assertEquals(true, json.contains("\"latestId\":\"latest1\"")); + assertEquals(true, json.contains("\"dataSourceName\":\"datasource1\"")); + } + + @Test + void fromXContentShouldDeserializeFlintIndexStateModel() throws Exception { + // Given + String json = + "{\"version\":\"1.0\",\"type\":\"flintindexstate\",\"state\":\"active\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"latestId\":\"latest1\",\"dataSourceName\":\"datasource1\",\"lastUpdateTime\":1623456789,\"error\":\"\"}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + FlintIndexStateModel flintIndexStateModel = serializer.fromXContent(parser, 1L, 1L); + + assertEquals(FlintIndexState.ACTIVE, flintIndexStateModel.getIndexState()); + assertEquals("app1", flintIndexStateModel.getApplicationId()); + assertEquals("job1", flintIndexStateModel.getJobId()); + assertEquals("latest1", flintIndexStateModel.getLatestId()); + assertEquals("datasource1", flintIndexStateModel.getDatasourceName()); + } + + @Test + void fromXContentThrowsExceptionWhenParsingInvalidContent() { + XContentParser parser = mock(XContentParser.class); + + assertThrows(RuntimeException.class, () -> serializer.fromXContent(parser, 0, 0)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializerTest.java new file mode 100644 index 0000000000..11a22eba57 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializerTest.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; + +class IndexDMLResultXContentSerializerTest { + + private final IndexDMLResultXContentSerializer serializer = + new IndexDMLResultXContentSerializer(); + + @Test + void toXContentShouldSerializeIndexDMLResult() throws IOException { + IndexDMLResult dmlResult = + new IndexDMLResult("query1", "SUCCESS", null, "datasource1", 1000L, 2000L); + + XContentBuilder xContentBuilder = serializer.toXContent(dmlResult, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertTrue(json.contains("\"queryId\":\"query1\"")); + assertTrue(json.contains("\"status\":\"SUCCESS\"")); + assertTrue(json.contains("\"error\":null")); + assertTrue(json.contains("\"dataSourceName\":\"datasource1\"")); + assertTrue(json.contains("\"queryRunTime\":1000")); + assertTrue(json.contains("\"updateTime\":2000")); + assertTrue(json.contains("\"result\":[]")); + assertTrue(json.contains("\"schema\":[]")); + } + + @Test + void toXContentShouldHandleErrorInIndexDMLResult() throws IOException { + // Given + IndexDMLResult dmlResult = + new IndexDMLResult("query1", "FAILURE", "An error occurred", "datasource1", 1000L, 2000L); + + // When + XContentBuilder xContentBuilder = serializer.toXContent(dmlResult, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + // Then + assertTrue(json.contains("\"queryId\":\"query1\"")); + assertTrue(json.contains("\"status\":\"FAILURE\"")); + assertTrue(json.contains("\"error\":\"An error occurred\"")); + assertTrue(json.contains("\"dataSourceName\":\"datasource1\"")); + assertTrue(json.contains("\"queryRunTime\":1000")); + assertTrue(json.contains("\"updateTime\":2000")); + assertTrue(json.contains("\"result\":[]")); + assertTrue(json.contains("\"schema\":[]")); + } + + @Test + void fromXContentShouldThrowUnsupportedOperationException() { + // When/Then + assertThrows(UnsupportedOperationException.class, () -> serializer.fromXContent(null, 0L, 0L)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializerTest.java new file mode 100644 index 0000000000..cd560fa4ca --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializerTest.java @@ -0,0 +1,100 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionModel; +import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.session.SessionType; + +class SessionModelXContentSerializerTest { + + private final SessionModelXContentSerializer serializer = new SessionModelXContentSerializer(); + + @Test + void toXContentShouldSerializeSessionModel() throws Exception { + // Given + SessionModel sessionModel = + SessionModel.builder() + .version("1.0") + .sessionType(SessionType.INTERACTIVE) + .sessionId(new SessionId("session1")) + .sessionState(SessionState.FAIL) + .datasourceName("datasource1") + .applicationId("app1") + .jobId("job1") + .lastUpdateTime(System.currentTimeMillis()) + .error(null) + .build(); + + // When + XContentBuilder xContentBuilder = serializer.toXContent(sessionModel, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + // Then + assertEquals(true, json.contains("\"version\":\"1.0\"")); + assertEquals(true, json.contains("\"type\":\"session\"")); + assertEquals(true, json.contains("\"sessionType\":\"interactive\"")); + assertEquals(true, json.contains("\"sessionId\":\"session1\"")); + assertEquals(true, json.contains("\"state\":\"fail\"")); + assertEquals(true, json.contains("\"dataSourceName\":\"datasource1\"")); + assertEquals(true, json.contains("\"applicationId\":\"app1\"")); + assertEquals(true, json.contains("\"jobId\":\"job1\"")); + } + + @Test + void fromXContentShouldDeserializeSessionModel() throws Exception { + // Given + String json = + "{\n" + + " \"version\": \"1.0\",\n" + + " \"type\": \"session\",\n" + + " \"sessionType\": \"interactive\",\n" + + " \"sessionId\": \"session1\",\n" + + " \"state\": \"fail\",\n" + + " \"dataSourceName\": \"datasource1\",\n" + + " \"applicationId\": \"app1\",\n" + + " \"jobId\": \"job1\",\n" + + " \"lastUpdateTime\": 1623456789,\n" + + " \"error\": \"\"\n" + + "}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + // When + SessionModel sessionModel = serializer.fromXContent(parser, 1L, 1L); + + // Then + assertEquals("1.0", sessionModel.getVersion()); + assertEquals(SessionType.INTERACTIVE, sessionModel.getSessionType()); + assertEquals("session1", sessionModel.getSessionId().getSessionId()); + assertEquals(SessionState.FAIL, sessionModel.getSessionState()); + assertEquals("datasource1", sessionModel.getDatasourceName()); + assertEquals("app1", sessionModel.getApplicationId()); + assertEquals("job1", sessionModel.getJobId()); + } + + @Test + void fromXContentThrowsExceptionWhenParsingInvalidContent() { + XContentParser parser = mock(XContentParser.class); + + assertThrows(RuntimeException.class, () -> serializer.fromXContent(parser, 0, 0)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializerTest.java new file mode 100644 index 0000000000..de355e1c99 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializerTest.java @@ -0,0 +1,128 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.xcontent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.statement.StatementId; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.rest.model.LangType; + +@ExtendWith(MockitoExtension.class) +class StatementModelXContentSerializerTest { + + private StatementModelXContentSerializer serializer; + + @Test + void toXContentShouldSerializeStatementModel() throws Exception { + + serializer = new StatementModelXContentSerializer(); + // Given + StatementModel statementModel = + StatementModel.builder() + .version("1.0") + .statementState(StatementState.RUNNING) + .statementId(new StatementId("statement1")) + .sessionId(new SessionId("session1")) + .applicationId("app1") + .jobId("job1") + .langType(LangType.SQL) + .datasourceName("datasource1") + .query("SELECT * FROM table") + .queryId("query1") + .submitTime(System.currentTimeMillis()) + .error(null) + .build(); + + // When + XContentBuilder xContentBuilder = + serializer.toXContent(statementModel, ToXContent.EMPTY_PARAMS); + String json = xContentBuilder.toString(); + + assertEquals(true, json.contains("\"version\":\"1.0\"")); + assertEquals(true, json.contains("\"state\":\"running\"")); + assertEquals(true, json.contains("\"statementId\":\"statement1\"")); + } + + @Test + void fromXContentShouldDeserializeStatementModel() throws Exception { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); + // Given + String json = + "{\"version\":\"1.0\",\"type\":\"statement\",\"state\":\"running\",\"statementId\":\"statement1\",\"sessionId\":\"session1\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"lang\":\"SQL\",\"dataSourceName\":\"datasource1\",\"query\":\"SELECT" + + " * FROM table\",\"queryId\":\"query1\",\"submitTime\":1623456789,\"error\":\"\"}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + StatementModel statementModel = serializer.fromXContent(parser, 1L, 1L); + + assertEquals("1.0", statementModel.getVersion()); + assertEquals(StatementState.RUNNING, statementModel.getStatementState()); + assertEquals("statement1", statementModel.getStatementId().getId()); + assertEquals("session1", statementModel.getSessionId().getSessionId()); + } + + @Test + void fromXContentShouldDeserializeStatementModelThrowException() throws Exception { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); + // Given + String json = + "{\"version\":\"1.0\",\"type\":\"statement_state\",\"state\":\"running\",\"statementId\":\"statement1\",\"sessionId\":\"session1\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"lang\":\"SQL\",\"dataSourceName\":\"datasource1\",\"query\":\"SELECT" + + " * FROM table\",\"queryId\":\"query1\",\"submitTime\":1623456789,\"error\":null}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, json); + parser.nextToken(); + + assertThrows(IllegalStateException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + } + + @Test + void fromXContentThrowsExceptionWhenParsingInvalidContent() { + XContentParser parser = mock(XContentParser.class); + + assertThrows(RuntimeException.class, () -> serializer.fromXContent(parser, 0, 0)); + } + + @Test + void fromXContentShouldThrowExceptionForUnexpectedField() throws Exception { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); + + String jsonWithUnexpectedField = + "{\"version\":\"1.0\",\"type\":\"statement\",\"state\":\"running\",\"statementId\":\"statement1\",\"sessionId\":\"session1\",\"applicationId\":\"app1\",\"jobId\":\"job1\",\"lang\":\"SQL\",\"dataSourceName\":\"datasource1\",\"query\":\"SELECT" + + " * FROM" + + " table\",\"queryId\":\"query1\",\"submitTime\":1623456789,\"error\":\"\",\"unexpectedField\":\"someValue\"}"; + XContentParser parser = + XContentType.JSON + .xContent() + .createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + jsonWithUnexpectedField); + parser.nextToken(); + + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> serializer.fromXContent(parser, 1L, 1L)); + assertEquals("Unexpected field: unexpectedField", exception.getMessage()); + } +}