Skip to content

Commit

Permalink
Abstract queryId generation (#2695)
Browse files Browse the repository at this point in the history
* Abstract queryId generation

Signed-off-by: Tomoyuki Morita <[email protected]>

* Remove OpenSearch specific id mapping from model classes

Signed-off-by: Tomoyuki Morita <[email protected]>

* Fix code style

Signed-off-by: Tomoyuki Morita <[email protected]>

---------

Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Jun 4, 2024
1 parent 3dd1729 commit 03a5e4d
Show file tree
Hide file tree
Showing 31 changed files with 227 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CreateAsyncQueryResponse createAsyncQuery(
.indexName(dispatchQueryResponse.getIndexName())
.build());
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
dispatchQueryResponse.getQueryId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery;
Expand All @@ -12,43 +10,46 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer;
import org.opensearch.sql.spark.utils.IDUtils;

/** Opensearch implementation of {@link AsyncQueryJobMetadataStorageService} */
/** OpenSearch implementation of {@link AsyncQueryJobMetadataStorageService} */
@RequiredArgsConstructor
public class OpensearchAsyncQueryJobMetadataStorageService
public class OpenSearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {

private final StateStore stateStore;
private final AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer;

private static final Logger LOGGER =
LogManager.getLogger(OpensearchAsyncQueryJobMetadataStorageService.class);
LogManager.getLogger(OpenSearchAsyncQueryJobMetadataStorageService.class);

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
stateStore.create(
mapIdToDocumentId(asyncQueryJobMetadata.getId()),
asyncQueryJobMetadata,
AsyncQueryJobMetadata::copy,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
OpenSearchStateStoreUtil.getIndexName(asyncQueryJobMetadata.getDatasourceName()));
}

private String mapIdToDocumentId(String id) {
return "qid" + id;
}

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
public Optional<AsyncQueryJobMetadata> getJobMetadata(String queryId) {
try {
AsyncQueryId queryId = new AsyncQueryId(qid);
return stateStore.get(
queryId.docId(),
mapIdToDocumentId(queryId),
asyncQueryJobMetadataXContentSerializer::fromXContent,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
OpenSearchStateStoreUtil.getIndexName(IDUtils.decode(queryId)));
} catch (Exception e) {
LOGGER.error("Error while fetching the job metadata.", e);
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", qid));
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", queryId));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery.model;
Expand All @@ -21,7 +19,7 @@
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
private final String queryId;
private final String applicationId;
private final String jobId;
private final String resultIndex;
Expand Down Expand Up @@ -59,6 +57,6 @@ public static AsyncQueryJobMetadata copy(

@Override
public String getId() {
return queryId.docId();
return queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId().getId();
return asyncQueryJobMetadata.getQueryId();
}

@Override
Expand Down Expand Up @@ -93,7 +93,12 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
return new DispatchQueryResponse(
context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.utils.IDUtils;

/** Generates QueryId by embedding Datasource name and random UUID */
public class DatasourceEmbeddedQueryIdProvider implements QueryIdProvider {

@Override
public String getQueryId(DispatchQueryRequest dispatchQueryRequest) {
return IDUtils.encode(dispatchQueryRequest.getDatasource());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
Expand Down Expand Up @@ -65,47 +65,59 @@ public DispatchQueryResponse submit(

getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata);

AsyncQueryId asyncQueryId =
String asyncQueryId =
storeIndexDMLResult(
context.getQueryId(),
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(asyncQueryId)
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
} catch (Exception e) {
LOG.error(e.getMessage());
AsyncQueryId asyncQueryId =
String asyncQueryId =
storeIndexDMLResult(
context.getQueryId(),
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(asyncQueryId)
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}
}

private AsyncQueryId storeIndexDMLResult(
private String storeIndexDMLResult(
String queryId,
DispatchQueryRequest dispatchQueryRequest,
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long queryRunTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
IndexDMLResult.builder()
.queryId(asyncQueryId.getId())
.queryId(queryId)
.status(status)
.error(error)
.datasourceName(dispatchQueryRequest.getDatasource())
.queryRunTime(queryRunTime)
.updateTime(System.currentTimeMillis())
.build();
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult);
return asyncQueryId;
return queryId;
}

private long getElapsedTimeSince(long startTime) {
Expand Down Expand Up @@ -143,7 +155,7 @@ private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails)

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
JSONObject result = new JSONObject();
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
Statement statement = getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId);
StatementState statementState = statement.getStatementState();
result.put(STATUS_FIELD, statementState.getState());
Expand All @@ -67,7 +67,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId).cancel();
return queryId;
}
Expand Down Expand Up @@ -118,11 +118,14 @@ public DispatchQueryResponse submit(
context.getQueryId(),
dispatchQueryRequest.getLangType(),
dispatchQueryRequest.getQuery()));
return new DispatchQueryResponse(
context.getQueryId(),
session.getSessionModel().getJobId(),
dataSourceMetadata.getResultIndex(),
session.getSessionId().getSessionId());
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(session.getSessionModel().getJobId())
.resultIndex(dataSourceMetadata.getResultIndex())
.sessionId(session.getSessionId().getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}

private Statement getStatementByQueryId(String sid, String qid) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;

/** Interface for extension point to specify queryId. Called when new query is executed. */
public interface QueryIdProvider {
String getQueryId(DispatchQueryRequest dispatchQueryRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,22 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
return asyncQueryJobMetadata.getQueryId();
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return new DispatchQueryResponse(
resp.getQueryId(),
resp.getJobId(),
resp.getResultIndex(),
resp.getSessionId(),
dataSourceMetadata.getName(),
JobType.BATCH,
context.getIndexQueryDetails().openSearchIndexName());
return DispatchQueryResponse.builder()
.queryId(resp.getQueryId())
.jobId(resp.getJobId())
.resultIndex(resp.getResultIndex())
.sessionId(resp.getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.BATCH)
.indexName(context.getIndexQueryDetails().openSearchIndexName())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
Expand All @@ -36,6 +35,7 @@ public class SparkQueryDispatcher {
private final DataSourceService dataSourceService;
private final SessionManager sessionManager;
private final QueryHandlerFactory queryHandlerFactory;
private final QueryIdProvider queryIdProvider;

public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
DataSourceMetadata dataSourceMetadata =
Expand All @@ -59,12 +59,12 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}
}

private static DispatchQueryContext.DispatchQueryContextBuilder getDefaultDispatchContextBuilder(
private DispatchQueryContext.DispatchQueryContextBuilder getDefaultDispatchContextBuilder(
DispatchQueryRequest dispatchQueryRequest, DataSourceMetadata dataSourceMetadata) {
return DispatchQueryContext.builder()
.dataSourceMetadata(dataSourceMetadata)
.tags(getDefaultTagsForJobSubmission(dispatchQueryRequest))
.queryId(AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()));
.queryId(queryIdProvider.getQueryId(dispatchQueryRequest));
}

private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand Down Expand Up @@ -82,13 +81,13 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
dataSourceMetadata.getResultIndex(),
null,
dataSourceMetadata.getName(),
JobType.STREAMING,
indexQueryDetails.openSearchIndexName());
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.STREAMING)
.indexName(indexQueryDetails.openSearchIndexName())
.build();
}
}
Loading

0 comments on commit 03a5e4d

Please sign in to comment.