Skip to content

Commit

Permalink
Async query get result bug fix (#2443)
Browse files Browse the repository at this point in the history
* Add IT to reproduce the bug first

Signed-off-by: Chen Dai <[email protected]>

* Modify IT as temporary reproduce commit

Signed-off-by: Chen Dai <[email protected]>

* Fix issue by preferred option and modify IT

Signed-off-by: Chen Dai <[email protected]>

* Refactor IT with fluent assertion

Signed-off-by: Chen Dai <[email protected]>

* Add IT for batch query handler

Signed-off-by: Chen Dai <[email protected]>

* Add IT for streaming query handler

Signed-off-by: Chen Dai <[email protected]>

* Add more IT for normal case

Signed-off-by: Chen Dai <[email protected]>

* Add IT for drop index

Signed-off-by: Chen Dai <[email protected]>

* Consider drop statement running if result doc unavailable

Signed-off-by: Chen Dai <[email protected]>

* Fix broken UT

Signed-off-by: Chen Dai <[email protected]>

* Address PR comments

Signed-off-by: Chen Dai <[email protected]>

* Address PR comments

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Dec 1, 2023
1 parent 2c230be commit 60058ce
Show file tree
Hide file tree
Showing 6 changed files with 471 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.execution.statement.StatementState;

/** Process async query request. */
public abstract class AsyncQueryHandler {
Expand All @@ -33,10 +34,20 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata)
result.put(ERROR_FIELD, error);
return result;
} else {
return getResponseFromExecutor(asyncQueryJobMetadata);
JSONObject statement = getResponseFromExecutor(asyncQueryJobMetadata);

// Consider statement still running if state is success but query result unavailable
if (isSuccessState(statement)) {
statement.put(STATUS_FIELD, StatementState.RUNNING.getState());
}
return statement;
}
}

private boolean isSuccessState(JSONObject statement) {
return StatementState.SUCCESS.getState().equalsIgnoreCase(statement.optString(STATUS_FIELD));
}

protected abstract JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.spark.dispatcher;

import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
Expand All @@ -24,6 +26,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
Expand Down Expand Up @@ -106,7 +109,11 @@ protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQuery

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
throw new IllegalStateException("[BUG] can't fetch result of index DML query form server");
// Consider statement still running if result doc created in submit() is not available yet
JSONObject result = new JSONObject();
result.put(STATUS_FIELD, StatementState.RUNNING.getState());
result.put(ERROR_FIELD, "");
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
import com.amazonaws.services.emrserverless.model.JobRunState;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -27,11 +28,13 @@
import java.util.List;
import java.util.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.node.NodeClient;
Expand All @@ -41,6 +44,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -63,6 +67,9 @@
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexType;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.storage.DataSourceFactory;
Expand Down Expand Up @@ -189,10 +196,17 @@ private DataSourceServiceImpl createDataSourceService() {

protected AsyncQueryExecutorService createAsyncQueryExecutorService(
EMRServerlessClient emrServerlessClient) {
return createAsyncQueryExecutorService(
emrServerlessClient, new JobExecutionResponseReader(client));
}

/** Pass a custom response reader which can mock interaction between PPL plugin and EMR-S job. */
protected AsyncQueryExecutorService createAsyncQueryExecutorService(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader) {
StateStore stateStore = new StateStore(client, clusterService);
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(stateStore);
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
emrServerlessClient,
Expand All @@ -215,6 +229,7 @@ public static class LocalEMRSClient implements EMRServerlessClient {
private int startJobRunCalled = 0;
private int cancelJobRunCalled = 0;
private int getJobResult = 0;
private JobRunState jobState = JobRunState.RUNNING;

@Getter private StartJobRequest jobRequest;

Expand All @@ -229,7 +244,7 @@ public String startJobRun(StartJobRequest startJobRequest) {
public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
getJobResult++;
JobRun jobRun = new JobRun();
jobRun.setState("RUNNING");
jobRun.setState(jobState.toString());
return new GetJobRunResult().withJobRun(jobRun);
}

Expand All @@ -250,6 +265,10 @@ public void cancelJobRunCalled(int expectedTimes) {
public void getJobRunResultCalled(int expectedTimes) {
assertEquals(expectedTimes, getJobResult);
}

public void setJobState(JobRunState jobState) {
this.jobState = jobState;
}
}

public SparkExecutionEngineConfig sparkExecutionEngineConfig() {
Expand Down Expand Up @@ -306,6 +325,111 @@ public String loadResultIndexMappings() {
return Resources.toString(url, Charsets.UTF_8);
}

public class MockFlintSparkJob {

private FlintIndexStateModel stateModel;

public MockFlintSparkJob(String latestId) {
assertNotNull(latestId);
stateModel =
new FlintIndexStateModel(
FlintIndexState.EMPTY,
"mockAppId",
"mockJobId",
latestId,
DATASOURCE,
System.currentTimeMillis(),
"",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel);
}

public void refreshing() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.REFRESHING);
}

public void cancelling() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.CANCELLING);
}

public void active() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.ACTIVE);
}

public void deleting() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.DELETING);
}

public void deleted() {
stateModel =
StateStore.updateFlintIndexState(stateStore, DATASOURCE)
.apply(stateModel, FlintIndexState.DELETED);
}

void assertState(FlintIndexState expected) {
Optional<FlintIndexStateModel> stateModelOpt =
StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId());
assertTrue((stateModelOpt.isPresent()));
assertEquals(expected, stateModelOpt.get().getIndexState());
}
}

@RequiredArgsConstructor
public class FlintDatasetMock {
final String query;
final FlintIndexType indexType;
final String indexName;
boolean isLegacy = false;
String latestId;

FlintDatasetMock isLegacy(boolean isLegacy) {
this.isLegacy = isLegacy;
return this;
}

FlintDatasetMock latestId(String latestId) {
this.latestId = latestId;
return this;
}

public void createIndex() {
String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1";
switch (indexType) {
case SKIPPING:
createIndexWithMappings(
indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json"));
break;
case COVERING:
createIndexWithMappings(
indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json"));
break;
case MATERIALIZED_VIEW:
createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json"));
break;
}
}

@SneakyThrows
public void deleteIndex() {
client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
}
}

@SneakyThrows
public static String loadMappings(String path) {
URL url = Resources.getResource(path);
return Resources.toString(url, Charsets.UTF_8);
}

public void createIndexWithMappings(String indexName, String metadata) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.mapping(metadata, XContentType.JSON);
Expand Down
Loading

0 comments on commit 60058ce

Please sign in to comment.