Skip to content

Commit

Permalink
Abstract queryId generation
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed May 24, 2024
1 parent 5a2c199 commit 5f0452f
Show file tree
Hide file tree
Showing 24 changed files with 180 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CreateAsyncQueryResponse createAsyncQuery(
.indexName(dispatchQueryResponse.getIndexName())
.build());
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
dispatchQueryResponse.getQueryId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ public class OpensearchAsyncQueryJobMetadataStorageService

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
stateStore.create(
asyncQueryJobMetadata,
AsyncQueryJobMetadata::copy,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
OpenSearchStateStoreUtil.getIndexName(asyncQueryJobMetadata.getDatasourceName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
private final String queryId;
private final String applicationId;
private final String jobId;
private final String resultIndex;
Expand Down Expand Up @@ -59,6 +59,6 @@ public static AsyncQueryJobMetadata copy(

@Override
public String getId() {
return queryId.docId();
return "qid" + queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId().getId();
return asyncQueryJobMetadata.getQueryId();
}

@Override
Expand Down Expand Up @@ -93,7 +93,12 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
return new DispatchQueryResponse(
context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;

/** Generates QueryId by embedding Datasource name and random UUID */
public class DatasourceEmbeddedQueryIdProvider implements QueryIdProvider {

@Override
public String getQueryId(DispatchQueryRequest dispatchQueryRequest) {
return AsyncQueryId.newAsyncQueryId(dispatchQueryRequest.getDatasource()).getId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
Expand Down Expand Up @@ -65,47 +65,59 @@ public DispatchQueryResponse submit(

getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata);

AsyncQueryId asyncQueryId =
String asyncQueryId =
storeIndexDMLResult(
context.getQueryId(),
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(asyncQueryId)
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
} catch (Exception e) {
LOG.error(e.getMessage());
AsyncQueryId asyncQueryId =
String asyncQueryId =
storeIndexDMLResult(
context.getQueryId(),
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(asyncQueryId)
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}
}

private AsyncQueryId storeIndexDMLResult(
private String storeIndexDMLResult(
String queryId,
DispatchQueryRequest dispatchQueryRequest,
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long queryRunTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
IndexDMLResult.builder()
.queryId(asyncQueryId.getId())
.queryId(queryId)
.status(status)
.error(error)
.datasourceName(dispatchQueryRequest.getDatasource())
.queryRunTime(queryRunTime)
.updateTime(System.currentTimeMillis())
.build();
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult);
return asyncQueryId;
return queryId;
}

private long getElapsedTimeSince(long startTime) {
Expand Down Expand Up @@ -143,7 +155,7 @@ private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails)

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
JSONObject result = new JSONObject();
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
Statement statement = getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId);
StatementState statementState = statement.getStatementState();
result.put(STATUS_FIELD, statementState.getState());
Expand All @@ -67,7 +67,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId).cancel();
return queryId;
}
Expand Down Expand Up @@ -118,11 +118,14 @@ public DispatchQueryResponse submit(
context.getQueryId(),
dispatchQueryRequest.getLangType(),
dispatchQueryRequest.getQuery()));
return new DispatchQueryResponse(
context.getQueryId(),
session.getSessionModel().getJobId(),
dataSourceMetadata.getResultIndex(),
session.getSessionId().getSessionId());
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(session.getSessionModel().getJobId())
.resultIndex(dataSourceMetadata.getResultIndex())
.sessionId(session.getSessionId().getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}

private Statement getStatementByQueryId(String sid, String qid) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;

/** Interface for extension point to specify queryId. Called when new query is executed. */
public interface QueryIdProvider {
String getQueryId(DispatchQueryRequest dispatchQueryRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,22 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
return asyncQueryJobMetadata.getQueryId();
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return new DispatchQueryResponse(
resp.getQueryId(),
resp.getJobId(),
resp.getResultIndex(),
resp.getSessionId(),
dataSourceMetadata.getName(),
JobType.BATCH,
context.getIndexQueryDetails().openSearchIndexName());
return DispatchQueryResponse.builder()
.queryId(resp.getQueryId())
.jobId(resp.getJobId())
.resultIndex(resp.getResultIndex())
.sessionId(resp.getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.BATCH)
.indexName(context.getIndexQueryDetails().openSearchIndexName())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
Expand All @@ -36,6 +35,7 @@ public class SparkQueryDispatcher {
private final DataSourceService dataSourceService;
private final SessionManager sessionManager;
private final QueryHandlerFactory queryHandlerFactory;
private final QueryIdProvider queryIdProvider;

public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
DataSourceMetadata dataSourceMetadata =
Expand All @@ -59,12 +59,12 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}
}

private static DispatchQueryContext.DispatchQueryContextBuilder getDefaultDispatchContextBuilder(
private DispatchQueryContext.DispatchQueryContextBuilder getDefaultDispatchContextBuilder(
DispatchQueryRequest dispatchQueryRequest, DataSourceMetadata dataSourceMetadata) {
return DispatchQueryContext.builder()
.dataSourceMetadata(dataSourceMetadata)
.tags(getDefaultTagsForJobSubmission(dispatchQueryRequest))
.queryId(AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()));
.queryId(queryIdProvider.getQueryId(dispatchQueryRequest));
}

private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand Down Expand Up @@ -82,13 +81,13 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
dataSourceMetadata.getResultIndex(),
null,
dataSourceMetadata.getName(),
JobType.STREAMING,
indexQueryDetails.openSearchIndexName());
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.STREAMING)
.indexName(indexQueryDetails.openSearchIndexName())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
import lombok.Builder;
import lombok.Getter;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;

@Getter
@Builder
public class DispatchQueryContext {
private final AsyncQueryId queryId;
private final String queryId;
private final DataSourceMetadata dataSourceMetadata;
private final Map<String, String> tags;
private final IndexQueryDetails indexQueryDetails;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,16 @@
package org.opensearch.sql.spark.dispatcher.model;

import lombok.Builder;
import lombok.Getter;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;

@Getter
@Builder
public class DispatchQueryResponse {
private final AsyncQueryId queryId;
private final String queryId;
private final String jobId;
private final String resultIndex;
private final String sessionId;
private final String datasourceName;
private final JobType jobType;
private final String indexName;

public DispatchQueryResponse(
AsyncQueryId queryId, String jobId, String resultIndex, String sessionId) {
this(queryId, jobId, resultIndex, sessionId, null, JobType.INTERACTIVE, null);
}

public DispatchQueryResponse(
AsyncQueryId queryId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName) {
this.queryId = queryId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
this.indexName = indexName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public StatementId submit(QueryRequest request) {
} else {
sessionModel = model.get();
if (!END_STATE.contains(sessionModel.getSessionState())) {
String qid = request.getQueryId().getId();
String qid = request.getQueryId();
StatementId statementId = newStatementId(qid);
Statement st =
Statement.builder()
Expand Down
Loading

0 comments on commit 5f0452f

Please sign in to comment.