Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract queryId generation #2695

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CreateAsyncQueryResponse createAsyncQuery(
.indexName(dispatchQueryResponse.getIndexName())
.build());
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
dispatchQueryResponse.getQueryId(), dispatchQueryResponse.getSessionId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery;
Expand All @@ -12,43 +10,46 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer;
import org.opensearch.sql.spark.utils.IDUtils;

/** Opensearch implementation of {@link AsyncQueryJobMetadataStorageService} */
/** OpenSearch implementation of {@link AsyncQueryJobMetadataStorageService} */
@RequiredArgsConstructor
public class OpensearchAsyncQueryJobMetadataStorageService
public class OpenSearchAsyncQueryJobMetadataStorageService
implements AsyncQueryJobMetadataStorageService {

private final StateStore stateStore;
private final AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer;

private static final Logger LOGGER =
LogManager.getLogger(OpensearchAsyncQueryJobMetadataStorageService.class);
LogManager.getLogger(OpenSearchAsyncQueryJobMetadataStorageService.class);

@Override
public void storeJobMetadata(AsyncQueryJobMetadata asyncQueryJobMetadata) {
AsyncQueryId queryId = asyncQueryJobMetadata.getQueryId();
stateStore.create(
mapIdToDocumentId(asyncQueryJobMetadata.getId()),
asyncQueryJobMetadata,
AsyncQueryJobMetadata::copy,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
OpenSearchStateStoreUtil.getIndexName(asyncQueryJobMetadata.getDatasourceName()));
}

private String mapIdToDocumentId(String id) {
return "qid" + id;
}

@Override
public Optional<AsyncQueryJobMetadata> getJobMetadata(String qid) {
public Optional<AsyncQueryJobMetadata> getJobMetadata(String queryId) {
try {
AsyncQueryId queryId = new AsyncQueryId(qid);
return stateStore.get(
queryId.docId(),
mapIdToDocumentId(queryId),
asyncQueryJobMetadataXContentSerializer::fromXContent,
OpenSearchStateStoreUtil.getIndexName(queryId.getDataSourceName()));
OpenSearchStateStoreUtil.getIndexName(IDUtils.decode(queryId)));
} catch (Exception e) {
LOGGER.error("Error while fetching the job metadata.", e);
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", qid));
throw new AsyncQueryNotFoundException(String.format("Invalid QueryId: %s", queryId));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery.model;
Expand All @@ -21,7 +19,7 @@
@SuperBuilder
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
private final String queryId;
private final String applicationId;
private final String jobId;
private final String resultIndex;
Expand Down Expand Up @@ -59,6 +57,6 @@ public static AsyncQueryJobMetadata copy(

@Override
public String getId() {
return queryId.docId();
return queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId().getId();
return asyncQueryJobMetadata.getQueryId();
}

@Override
Expand Down Expand Up @@ -93,7 +93,12 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
return new DispatchQueryResponse(
context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.utils.IDUtils;

/** Generates QueryId by embedding Datasource name and random UUID */
public class DatasourceEmbeddedQueryIdProvider implements QueryIdProvider {

@Override
public String getQueryId(DispatchQueryRequest dispatchQueryRequest) {
return IDUtils.encode(dispatchQueryRequest.getDatasource());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
Expand Down Expand Up @@ -65,47 +65,59 @@ public DispatchQueryResponse submit(

getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata);

AsyncQueryId asyncQueryId =
String asyncQueryId =
storeIndexDMLResult(
context.getQueryId(),
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(asyncQueryId)
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
} catch (Exception e) {
LOG.error(e.getMessage());
AsyncQueryId asyncQueryId =
String asyncQueryId =
storeIndexDMLResult(
context.getQueryId(),
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
return DispatchQueryResponse.builder()
.queryId(asyncQueryId)
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}
}

private AsyncQueryId storeIndexDMLResult(
private String storeIndexDMLResult(
String queryId,
DispatchQueryRequest dispatchQueryRequest,
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long queryRunTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
IndexDMLResult.builder()
.queryId(asyncQueryId.getId())
.queryId(queryId)
.status(status)
.error(error)
.datasourceName(dispatchQueryRequest.getDatasource())
.queryRunTime(queryRunTime)
.updateTime(System.currentTimeMillis())
.build();
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult);
return asyncQueryId;
return queryId;
}

private long getElapsedTimeSince(long startTime) {
Expand Down Expand Up @@ -143,7 +155,7 @@ private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails)

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
JSONObject result = new JSONObject();
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
Statement statement = getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId);
StatementState statementState = statement.getStatementState();
result.put(STATUS_FIELD, statementState.getState());
Expand All @@ -67,7 +67,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
String queryId = asyncQueryJobMetadata.getQueryId();
getStatementByQueryId(asyncQueryJobMetadata.getSessionId(), queryId).cancel();
return queryId;
}
Expand Down Expand Up @@ -118,11 +118,14 @@ public DispatchQueryResponse submit(
context.getQueryId(),
dispatchQueryRequest.getLangType(),
dispatchQueryRequest.getQuery()));
return new DispatchQueryResponse(
context.getQueryId(),
session.getSessionModel().getJobId(),
dataSourceMetadata.getResultIndex(),
session.getSessionId().getSessionId());
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(session.getSessionModel().getJobId())
.resultIndex(dataSourceMetadata.getResultIndex())
.sessionId(session.getSessionId().getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.build();
}

private Statement getStatementByQueryId(String sid, String qid) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;

/** Interface for extension point to specify queryId. Called when new query is executed. */
public interface QueryIdProvider {
String getQueryId(DispatchQueryRequest dispatchQueryRequest);
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,22 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
return asyncQueryJobMetadata.getQueryId();
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return new DispatchQueryResponse(
resp.getQueryId(),
resp.getJobId(),
resp.getResultIndex(),
resp.getSessionId(),
dataSourceMetadata.getName(),
JobType.BATCH,
context.getIndexQueryDetails().openSearchIndexName());
return DispatchQueryResponse.builder()
.queryId(resp.getQueryId())
.jobId(resp.getJobId())
.resultIndex(resp.getResultIndex())
.sessionId(resp.getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.BATCH)
.indexName(context.getIndexQueryDetails().openSearchIndexName())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
Expand All @@ -36,6 +35,7 @@ public class SparkQueryDispatcher {
private final DataSourceService dataSourceService;
private final SessionManager sessionManager;
private final QueryHandlerFactory queryHandlerFactory;
private final QueryIdProvider queryIdProvider;

public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
DataSourceMetadata dataSourceMetadata =
Expand All @@ -59,12 +59,12 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
}
}

private static DispatchQueryContext.DispatchQueryContextBuilder getDefaultDispatchContextBuilder(
private DispatchQueryContext.DispatchQueryContextBuilder getDefaultDispatchContextBuilder(
DispatchQueryRequest dispatchQueryRequest, DataSourceMetadata dataSourceMetadata) {
return DispatchQueryContext.builder()
.dataSourceMetadata(dataSourceMetadata)
.tags(getDefaultTagsForJobSubmission(dispatchQueryRequest))
.queryId(AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()));
.queryId(queryIdProvider.getQueryId(dispatchQueryRequest));
}

private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand Down Expand Up @@ -82,13 +81,13 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
dataSourceMetadata.getResultIndex(),
null,
dataSourceMetadata.getName(),
JobType.STREAMING,
indexQueryDetails.openSearchIndexName());
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.STREAMING)
.indexName(indexQueryDetails.openSearchIndexName())
.build();
}
}
Loading
Loading