Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Statemodels free of XContent #2619

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,9 @@

package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID;

import com.google.gson.Gson;
import java.io.IOException;
import java.util.Locale;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;
Expand Down Expand Up @@ -134,29 +124,6 @@ public String toString() {
return new Gson().toJson(this);
}

/**
* Converts JobMetadata to XContentBuilder.
*
* @return XContentBuilder {@link XContentBuilder}
* @throws Exception Exception.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId.getId())
.field("type", TYPE_JOBMETA)
.field("jobId", jobId)
.field("applicationId", applicationId)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.field(DATASOURCE_NAME, datasourceName)
.field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT))
.field(INDEX_NAME, indexName)
.endObject();
return builder;
}

/** copy builder. update seqNo and primaryTerm */
public static AsyncQueryJobMetadata copy(
AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) {
Expand All @@ -173,72 +140,6 @@ public static AsyncQueryJobMetadata copy(
primaryTerm);
}

/**
* Convert xcontent parser to JobMetadata.
*
* @param parser parser.
* @return JobMetadata {@link AsyncQueryJobMetadata}
* @throws IOException IOException.
*/
@SneakyThrows
public static AsyncQueryJobMetadata fromXContent(
XContentParser parser, long seqNo, long primaryTerm) {
AsyncQueryId queryId = null;
String jobId = null;
String applicationId = null;
String resultIndex = null;
String sessionId = null;
String datasourceName = null;
String jobTypeStr = null;
String indexName = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case QUERY_ID:
queryId = new AsyncQueryId(parser.textOrNull());
break;
case "jobId":
jobId = parser.textOrNull();
break;
case "applicationId":
applicationId = parser.textOrNull();
break;
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;
case DATASOURCE_NAME:
datasourceName = parser.textOrNull();
case JOB_TYPE:
jobTypeStr = parser.textOrNull();
case INDEX_NAME:
indexName = parser.textOrNull();
case "type":
break;
default:
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (jobId == null || applicationId == null) {
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr),
indexName,
seqNo,
primaryTerm);
}

@Override
public String getId() {
return queryId.docId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@

package org.opensearch.sql.spark.dispatcher.model;

import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Plugin create Index DML result. */
@Data
@EqualsAndHashCode(callSuper = false)
public class IndexDMLResult extends StateModel {
private static final String QUERY_ID = "queryId";
private static final String QUERY_RUNTIME = "queryRunTime";
private static final String UPDATE_TIME = "updateTime";
private static final String DOC_ID_PREFIX = "index";
public static final String QUERY_ID = "queryId";
public static final String QUERY_RUNTIME = "queryRunTime";
public static final String UPDATE_TIME = "updateTime";
public static final String DOC_ID_PREFIX = "index";

private final String queryId;
private final String status;
Expand Down Expand Up @@ -55,20 +50,4 @@ public long getSeqNo() {
public long getPrimaryTerm() {
return SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId)
.field("status", status)
.field("error", error)
.field(DATASOURCE_NAME, datasourceName)
.field(QUERY_RUNTIME, queryRunTime)
.field(UPDATE_TIME, updateTime)
.field("result", ImmutableList.of())
.field("schema", ImmutableList.of())
.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,8 @@
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

Expand Down Expand Up @@ -48,24 +43,6 @@ public class SessionModel extends StateModel {
private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, SESSION_DOC_TYPE)
.field(SESSION_TYPE, sessionType.getSessionType())
.field(SESSION_ID, sessionId.getSessionId())
.field(SESSION_STATE, sessionState.getSessionState())
.field(DATASOURCE_NAME, datasourceName)
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
Expand Down Expand Up @@ -99,52 +76,6 @@ public static SessionModel copyWithState(
.build();
}

@SneakyThrows
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
SessionModelBuilder builder = new SessionModelBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case SESSION_TYPE:
builder.sessionType(SessionType.fromString(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case SESSION_STATE:
builder.sessionState(SessionState.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case ERROR:
builder.error(parser.text());
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case TYPE:
// do nothing.
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static SessionModel initInteractiveSession(
String applicationId, String jobId, SessionId sid, String datasourceName) {
return builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,10 @@

package org.opensearch.sql.spark.execution.statement;

import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID;
import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.statestore.StateModel;
Expand Down Expand Up @@ -55,27 +47,6 @@ public class StatementModel extends StateModel {
private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, STATEMENT_DOC_TYPE)
.field(STATEMENT_STATE, statementState.getState())
.field(STATEMENT_ID, statementId.getId())
.field(SESSION_ID, sessionId.getSessionId())
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LANG, langType.getText())
.field(DATASOURCE_NAME, datasourceName)
.field(QUERY, query)
.field(QUERY_ID, queryId)
.field(SUBMIT_TIME, submitTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static StatementModel copy(StatementModel copy, long seqNo, long primaryTerm) {
return builder()
.version("1.0")
Expand Down Expand Up @@ -115,61 +86,6 @@ public static StatementModel copyWithState(
.build();
}

@SneakyThrows
public static StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
StatementModel.StatementModelBuilder builder = StatementModel.builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case TYPE:
// do nothing
break;
case STATEMENT_STATE:
builder.statementState(StatementState.fromString(parser.text()));
break;
case STATEMENT_ID:
builder.statementId(new StatementId(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LANG:
builder.langType(LangType.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case QUERY:
builder.query(parser.text());
break;
case QUERY_ID:
builder.queryId(parser.text());
break;
case SUBMIT_TIME:
builder.submitTime(parser.longValue());
break;
case ERROR:
builder.error(parser.text());
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static StatementModel submitStatement(
SessionId sid,
String applicationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.sql.spark.execution.statestore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing copyright. also in interface FromXContent and StateCopyBuilder.


public interface CopyBuilder<T> {
T of(T copy, long seqNo, long primaryTerm);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.sql.spark.execution.statestore;

import org.opensearch.core.xcontent.XContentParser;

public interface FromXContent<T extends StateModel> {
T fromXContent(XContentParser parser, long seqNo, long primaryTerm);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.sql.spark.execution.statestore;

public interface StateCopyBuilder<T extends StateModel, S> {
T of(T copy, S state, long seqNo, long primaryTerm);
}
Loading
Loading