Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Make models free of XContent (#2677) #2684

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,9 @@

package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID;

import com.google.gson.Gson;
import java.io.IOException;
import java.util.Locale;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;
Expand All @@ -28,10 +18,6 @@
@Data
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
public static final String TYPE_JOBMETA = "jobmeta";
public static final String JOB_TYPE = "jobType";
public static final String INDEX_NAME = "indexName";

private final AsyncQueryId queryId;
private final String applicationId;
private final String jobId;
Expand Down Expand Up @@ -134,29 +120,6 @@ public String toString() {
return new Gson().toJson(this);
}

/**
* Converts JobMetadata to XContentBuilder.
*
* @return XContentBuilder {@link XContentBuilder}
* @throws Exception Exception.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId.getId())
.field("type", TYPE_JOBMETA)
.field("jobId", jobId)
.field("applicationId", applicationId)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.field(DATASOURCE_NAME, datasourceName)
.field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT))
.field(INDEX_NAME, indexName)
.endObject();
return builder;
}

/** copy builder. update seqNo and primaryTerm */
public static AsyncQueryJobMetadata copy(
AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) {
Expand All @@ -173,72 +136,6 @@ public static AsyncQueryJobMetadata copy(
primaryTerm);
}

/**
* Convert xcontent parser to JobMetadata.
*
* @param parser parser.
* @return JobMetadata {@link AsyncQueryJobMetadata}
* @throws IOException IOException.
*/
@SneakyThrows
public static AsyncQueryJobMetadata fromXContent(
XContentParser parser, long seqNo, long primaryTerm) {
AsyncQueryId queryId = null;
String jobId = null;
String applicationId = null;
String resultIndex = null;
String sessionId = null;
String datasourceName = null;
String jobTypeStr = null;
String indexName = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case QUERY_ID:
queryId = new AsyncQueryId(parser.textOrNull());
break;
case "jobId":
jobId = parser.textOrNull();
break;
case "applicationId":
applicationId = parser.textOrNull();
break;
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;
case DATASOURCE_NAME:
datasourceName = parser.textOrNull();
case JOB_TYPE:
jobTypeStr = parser.textOrNull();
case INDEX_NAME:
indexName = parser.textOrNull();
case "type":
break;
default:
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (jobId == null || applicationId == null) {
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr),
indexName,
seqNo,
primaryTerm);
}

@Override
public String getId() {
return queryId.docId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.*;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;

import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -27,6 +26,7 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;

/** Define Spark Submit Parameters. */
@AllArgsConstructor
Expand Down Expand Up @@ -181,7 +181,7 @@ public Builder extraParameters(String params) {
}

public Builder sessionExecution(String sessionId, String datasourceName) {
config.put(FLINT_JOB_REQUEST_INDEX, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
config.put(FLINT_JOB_REQUEST_INDEX, OpenSearchStateStoreUtil.getIndexName(datasourceName));
config.put(FLINT_JOB_SESSION_ID, sessionId);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package org.opensearch.sql.spark.cluster;

import static org.opensearch.sql.spark.execution.session.SessionModel.LAST_UPDATE_TIME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.SUBMIT_TIME;
import static org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer.SUBMIT_TIME;
import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.LAST_UPDATE_TIME;

import java.time.Clock;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@

package org.opensearch.sql.spark.dispatcher.model;

import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Plugin create Index DML result. */
@Data
@EqualsAndHashCode(callSuper = false)
public class IndexDMLResult extends StateModel {
private static final String QUERY_ID = "queryId";
private static final String QUERY_RUNTIME = "queryRunTime";
private static final String UPDATE_TIME = "updateTime";
private static final String DOC_ID_PREFIX = "index";
public static final String DOC_ID_PREFIX = "index";

private final String queryId;
private final String status;
Expand Down Expand Up @@ -55,20 +47,4 @@ public long getSeqNo() {
public long getPrimaryTerm() {
return SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId)
.field("status", status)
.field("error", error)
.field(DATASOURCE_NAME, datasourceName)
.field(QUERY_RUNTIME, queryRunTime)
.field(UPDATE_TIME, updateTime)
.field("result", ImmutableList.of())
.field("schema", ImmutableList.of())
.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,17 @@
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel extends StateModel {
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
public static final String SESSION_ID = "sessionId";
public static final String SESSION_STATE = "state";
public static final String DATASOURCE_NAME = "dataSourceName";
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
public static final String APPLICATION_ID = "applicationId";
public static final String JOB_ID = "jobId";
public static final String ERROR = "error";

public static final String UNKNOWN = "unknown";
public static final String SESSION_DOC_TYPE = "session";

private final String version;
private final SessionType sessionType;
Expand All @@ -48,24 +33,6 @@ public class SessionModel extends StateModel {
private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, SESSION_DOC_TYPE)
.field(SESSION_TYPE, sessionType.getSessionType())
.field(SESSION_ID, sessionId.getSessionId())
.field(SESSION_STATE, sessionState.getSessionState())
.field(DATASOURCE_NAME, datasourceName)
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
Expand Down Expand Up @@ -99,52 +66,6 @@ public static SessionModel copyWithState(
.build();
}

@SneakyThrows
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
SessionModelBuilder builder = new SessionModelBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case SESSION_TYPE:
builder.sessionType(SessionType.fromString(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case SESSION_STATE:
builder.sessionState(SessionState.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case ERROR:
builder.error(parser.text());
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case TYPE:
// do nothing.
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static SessionModel initInteractiveSession(
String applicationId, String jobId, SessionId sid, String datasourceName) {
return builder()
Expand Down
Loading
Loading