Skip to content

Commit

Permalink
Refactor data models to be generic to data storage (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#2687) (opensearch-project#2690)

* Refactor data models to be generic to data storage



* Address review comments



* Reduce redundancy



---------


(cherry picked from commit 3a28d2a)

Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 9518517 commit 2753966
Show file tree
Hide file tree
Showing 31 changed files with 472 additions and 487 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ public CreateAsyncQueryResponse createAsyncQuery(
sparkExecutionEngineConfig.getSparkSubmitParameters(),
createAsyncQueryRequest.getSessionId()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
dispatchQueryResponse.getQueryId(),
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId(),
dispatchQueryResponse.getDatasourceName(),
dispatchQueryResponse.getJobType(),
dispatchQueryResponse.getIndexName()));
AsyncQueryJobMetadata.builder()
.queryId(dispatchQueryResponse.getQueryId())
.applicationId(sparkExecutionEngineConfig.getApplicationId())
.jobId(dispatchQueryResponse.getJobId())
.resultIndex(dispatchQueryResponse.getResultIndex())
.sessionId(dispatchQueryResponse.getSessionId())
.datasourceName(dispatchQueryResponse.getDatasourceName())
.jobType(dispatchQueryResponse.getJobType())
.indexName(dispatchQueryResponse.getIndexName())
.build());
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,45 @@

package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.spark.execution.statestore.StateStore.createJobMetaData;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer;

/** Opensearch implementation of {@link AsyncQueryJobMetadataStorageService} */
@RequiredArgsConstructor
public class OpensearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {

private final StateStore stateStore;
private final AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer;

private static final Logger LOGGER =
LogManager.getLogger(OpensearchAsyncQueryJobMetadataStorageService.class);

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
createJobMetaData(stateStore, queryId.getDataSourceName()).apply(asyncQueryJobMetadata);
stateStore.create(
asyncQueryJobMetadata,
AsyncQueryJobMetadata::copy,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
}

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
try {
AsyncQueryId queryId = new AsyncQueryId(qid);
return StateStore.getJobMetaData(stateStore, queryId.getDataSourceName())
.apply(queryId.docId());
return stateStore.get(
queryId.docId(),
asyncQueryJobMetadataXContentSerializer::fromXContent,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
} catch (Exception e) {
LOGGER.error("Error while fetching the job metadata.", e);
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", qid));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

package org.opensearch.sql.spark.asyncquery.model;

import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import lombok.Builder.Default;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.index.seqno.SequenceNumbers;
import lombok.experimental.SuperBuilder;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** This class models all the metadata required for a job. */
@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
Expand All @@ -27,113 +30,31 @@ public class AsyncQueryJobMetadata extends StateModel {
// since 2.13
// jobType could be null before OpenSearch 2.12. SparkQueryDispatcher use jobType to choose
// cancel query handler. if jobType is null, it will invoke BatchQueryHandler.cancel().
private final JobType jobType;
@Default private final JobType jobType = JobType.INTERACTIVE;
// null if JobType is null
private final String datasourceName;
// null if JobType is INTERACTIVE or null
private final String indexName;

@EqualsAndHashCode.Exclude private final long seqNo;
@EqualsAndHashCode.Exclude private final long primaryTerm;

public AsyncQueryJobMetadata(
AsyncQueryId queryId, String applicationId, String jobId, String resultIndex) {
this(
queryId,
applicationId,
jobId,
resultIndex,
null,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId) {
this(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName) {
this(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
jobType,
indexName,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName,
long seqNo,
long primaryTerm) {
this.queryId = queryId;
this.applicationId = applicationId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
this.indexName = indexName;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

@Override
public String toString() {
return new Gson().toJson(this);
}

/** copy builder. update seqNo and primaryTerm */
public static AsyncQueryJobMetadata copy(
AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) {
return new AsyncQueryJobMetadata(
copy.getQueryId(),
copy.getApplicationId(),
copy.getJobId(),
copy.getResultIndex(),
copy.getSessionId(),
copy.datasourceName,
copy.jobType,
copy.indexName,
seqNo,
primaryTerm);
AsyncQueryJobMetadata copy, ImmutableMap<String, Object> metadata) {
return builder()
.queryId(copy.queryId)
.applicationId(copy.getApplicationId())
.jobId(copy.getJobId())
.resultIndex(copy.getResultIndex())
.sessionId(copy.getSessionId())
.datasourceName(copy.datasourceName)
.jobType(copy.jobType)
.indexName(copy.indexName)
.metadata(metadata)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ private AsyncQueryId storeIndexDMLResult(
long queryRunTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
new IndexDMLResult(
asyncQueryId.getId(),
status,
error,
dispatchQueryRequest.getDatasource(),
queryRunTime,
System.currentTimeMillis());
IndexDMLResult.builder()
.queryId(asyncQueryId.getId())
.status(status)
.error(error)
.datasourceName(dispatchQueryRequest.getDatasource())
.queryRunTime(queryRunTime)
.updateTime(System.currentTimeMillis())
.build();
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult);
return asyncQueryId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

package org.opensearch.sql.spark.dispatcher.model;

import com.google.common.collect.ImmutableMap;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.index.seqno.SequenceNumbers;
import lombok.experimental.SuperBuilder;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Plugin create Index DML result. */
@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class IndexDMLResult extends StateModel {
public static final String DOC_ID_PREFIX = "index";
Expand All @@ -23,28 +25,20 @@ public class IndexDMLResult extends StateModel {
private final Long queryRunTime;
private final Long updateTime;

public static IndexDMLResult copy(IndexDMLResult copy, long seqNo, long primaryTerm) {
return new IndexDMLResult(
copy.queryId,
copy.status,
copy.error,
copy.datasourceName,
copy.queryRunTime,
copy.updateTime);
public static IndexDMLResult copy(IndexDMLResult copy, ImmutableMap<String, Object> metadata) {
return builder()
.queryId(copy.queryId)
.status(copy.status)
.error(copy.error)
.datasourceName(copy.datasourceName)
.queryRunTime(copy.queryRunTime)
.updateTime(copy.updateTime)
.metadata(metadata)
.build();
}

@Override
public String getId() {
return DOC_ID_PREFIX + queryId;
}

@Override
public long getSeqNo() {
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}

@Override
public long getPrimaryTerm() {
return SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import lombok.Builder;
import com.google.common.collect.ImmutableMap;
import lombok.Data;
import org.opensearch.index.seqno.SequenceNumbers;
import lombok.experimental.SuperBuilder;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
@SuperBuilder
public class SessionModel extends StateModel {

public static final String UNKNOWN = "unknown";
Expand All @@ -30,10 +30,7 @@ public class SessionModel extends StateModel {
private final String error;
private final long lastUpdateTime;

private final long seqNo;
private final long primaryTerm;

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
public static SessionModel of(SessionModel copy, ImmutableMap<String, Object> metadata) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
Expand All @@ -44,13 +41,12 @@ public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.metadata(metadata)
.build();
}

public static SessionModel copyWithState(
SessionModel copy, SessionState state, long seqNo, long primaryTerm) {
SessionModel copy, SessionState state, ImmutableMap<String, Object> metadata) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
Expand All @@ -61,8 +57,7 @@ public static SessionModel copyWithState(
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.metadata(metadata)
.build();
}

Expand All @@ -78,8 +73,6 @@ public static SessionModel initInteractiveSession(
.jobId(jobId)
.error(UNKNOWN)
.lastUpdateTime(System.currentTimeMillis())
.seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.build();
}

Expand Down
Loading

0 comments on commit 2753966

Please sign in to comment.