diff --git a/docs/user/interfaces/asyncqueryinterface.rst b/docs/user/interfaces/asyncqueryinterface.rst index 8cc7c6fec9..af49a59838 100644 --- a/docs/user/interfaces/asyncqueryinterface.rst +++ b/docs/user/interfaces/asyncqueryinterface.rst @@ -185,6 +185,9 @@ Async Query Cancellation API ====================================== If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``. +Limitation: Flint index creation statement with auto_refresh = true can not be cancelled. User could submit ALTER statement to stop auto refresh query. +- flint index creation statement include, CREATE SKIPPING INDEX / CREATE INDEX / CREATE MATERIALIZED VIEW + HTTP URI: ``_plugins/_async_query/{queryId}`` HTTP VERB: ``DELETE`` diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java index eb77725052..4f9dfdc033 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java @@ -56,7 +56,10 @@ public CreateAsyncQueryResponse createAsyncQuery( sparkExecutionEngineConfig.getApplicationId(), dispatchQueryResponse.getJobId(), dispatchQueryResponse.getResultIndex(), - dispatchQueryResponse.getSessionId())); + dispatchQueryResponse.getSessionId(), + dispatchQueryResponse.getDatasourceName(), + dispatchQueryResponse.getJobType(), + dispatchQueryResponse.getIndexName())); return new CreateAsyncQueryResponse( dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId()); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index d1357f364d..1c7fd35c5e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -8,16 +8,20 @@ package org.opensearch.sql.spark.asyncquery.model; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; import com.google.gson.Gson; import java.io.IOException; +import java.util.Locale; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.SneakyThrows; +import org.opensearch.core.common.Strings; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateModel; /** This class models all the metadata required for a job. */ @@ -25,6 +29,8 @@ @EqualsAndHashCode(callSuper = false) public class AsyncQueryJobMetadata extends StateModel { public static final String TYPE_JOBMETA = "jobmeta"; + public static final String JOB_TYPE = "jobType"; + public static final String INDEX_NAME = "indexName"; private final AsyncQueryId queryId; private final String applicationId; @@ -32,6 +38,14 @@ public class AsyncQueryJobMetadata extends StateModel { private final String resultIndex; // optional sessionId. private final String sessionId; + // since 2.13 + // jobType could be null before OpenSearch 2.12. SparkQueryDispatcher use jobType to choose + // cancel query handler. if jobType is null, it will invoke BatchQueryHandler.cancel(). + private final JobType jobType; + // null if JobType is null + private final String datasourceName; + // null if JobType is INTERACTIVE or null + private final String indexName; @EqualsAndHashCode.Exclude private final long seqNo; @EqualsAndHashCode.Exclude private final long primaryTerm; @@ -44,6 +58,9 @@ public AsyncQueryJobMetadata( jobId, resultIndex, null, + null, + JobType.INTERACTIVE, + null, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); } @@ -60,6 +77,31 @@ public AsyncQueryJobMetadata( jobId, resultIndex, sessionId, + null, + JobType.INTERACTIVE, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + } + + public AsyncQueryJobMetadata( + AsyncQueryId queryId, + String applicationId, + String jobId, + String resultIndex, + String sessionId, + String datasourceName, + JobType jobType, + String indexName) { + this( + queryId, + applicationId, + jobId, + resultIndex, + sessionId, + datasourceName, + jobType, + indexName, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); } @@ -70,6 +112,9 @@ public AsyncQueryJobMetadata( String jobId, String resultIndex, String sessionId, + String datasourceName, + JobType jobType, + String indexName, long seqNo, long primaryTerm) { this.queryId = queryId; @@ -77,6 +122,9 @@ public AsyncQueryJobMetadata( this.jobId = jobId; this.resultIndex = resultIndex; this.sessionId = sessionId; + this.datasourceName = datasourceName; + this.jobType = jobType; + this.indexName = indexName; this.seqNo = seqNo; this.primaryTerm = primaryTerm; } @@ -102,6 +150,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field("applicationId", applicationId) .field("resultIndex", resultIndex) .field("sessionId", sessionId) + .field(DATASOURCE_NAME, datasourceName) + .field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT)) + .field(INDEX_NAME, indexName) .endObject(); return builder; } @@ -115,6 +166,9 @@ public static AsyncQueryJobMetadata copy( copy.getJobId(), copy.getResultIndex(), copy.getSessionId(), + copy.datasourceName, + copy.jobType, + copy.indexName, seqNo, primaryTerm); } @@ -132,9 +186,11 @@ public static AsyncQueryJobMetadata fromXContent( AsyncQueryId queryId = null; String jobId = null; String applicationId = null; - boolean isDropIndexQuery = false; String resultIndex = null; String sessionId = null; + String datasourceName = null; + String jobTypeStr = null; + String indexName = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { String fieldName = parser.currentName(); @@ -149,15 +205,18 @@ public static AsyncQueryJobMetadata fromXContent( case "applicationId": applicationId = parser.textOrNull(); break; - case "isDropIndexQuery": - isDropIndexQuery = parser.booleanValue(); - break; case "resultIndex": resultIndex = parser.textOrNull(); break; case "sessionId": sessionId = parser.textOrNull(); break; + case DATASOURCE_NAME: + datasourceName = parser.textOrNull(); + case JOB_TYPE: + jobTypeStr = parser.textOrNull(); + case INDEX_NAME: + indexName = parser.textOrNull(); case "type": break; default: @@ -168,7 +227,16 @@ public static AsyncQueryJobMetadata fromXContent( throw new IllegalArgumentException("jobId and applicationId are required fields."); } return new AsyncQueryJobMetadata( - queryId, applicationId, jobId, resultIndex, sessionId, seqNo, primaryTerm); + queryId, + applicationId, + jobId, + resultIndex, + sessionId, + datasourceName, + Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr), + indexName, + seqNo, + primaryTerm); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java new file mode 100644 index 0000000000..0528a189f0 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher; + +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; +import org.opensearch.sql.spark.dispatcher.model.JobType; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.operation.FlintIndexOp; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; +import org.opensearch.sql.spark.leasemanager.LeaseManager; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; + +/** Handle Refresh Query. */ +public class RefreshQueryHandler extends BatchQueryHandler { + + private final FlintIndexMetadataReader flintIndexMetadataReader; + private final StateStore stateStore; + private final EMRServerlessClient emrServerlessClient; + + public RefreshQueryHandler( + EMRServerlessClient emrServerlessClient, + JobExecutionResponseReader jobExecutionResponseReader, + FlintIndexMetadataReader flintIndexMetadataReader, + StateStore stateStore, + LeaseManager leaseManager) { + super(emrServerlessClient, jobExecutionResponseReader, leaseManager); + this.flintIndexMetadataReader = flintIndexMetadataReader; + this.stateStore = stateStore; + this.emrServerlessClient = emrServerlessClient; + } + + @Override + public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { + String datasourceName = asyncQueryJobMetadata.getDatasourceName(); + FlintIndexMetadata indexMetadata = + flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName()); + FlintIndexOp jobCancelOp = + new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient); + jobCancelOp.apply(indexMetadata); + return asyncQueryJobMetadata.getQueryId().getId(); + } + + @Override + public DispatchQueryResponse submit( + DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { + DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context); + DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); + return new DispatchQueryResponse( + resp.getQueryId(), + resp.getJobId(), + resp.getResultIndex(), + resp.getSessionId(), + dataSourceMetadata.getName(), + JobType.BATCH, + context.getIndexQueryDetails().openSearchIndexName()); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index cd4177a0f0..2d6a456a61 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -22,6 +22,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; @@ -90,7 +91,12 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) } else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) { // manual refresh should be handled by batch handler asyncQueryHandler = - new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); + new RefreshQueryHandler( + emrServerlessClient, + jobExecutionResponseReader, + flintIndexMetadataReader, + stateStore, + leaseManager); } } return asyncQueryHandler.submit(dispatchQueryRequest, contextBuilder.build()); @@ -117,6 +123,17 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager); } else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) { queryHandler = createIndexDMLHandler(emrServerlessClient); + } else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) { + queryHandler = + new RefreshQueryHandler( + emrServerlessClient, + jobExecutionResponseReader, + flintIndexMetadataReader, + stateStore, + leaseManager); + } else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) { + queryHandler = + new StreamingQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); } else { queryHandler = new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index b64c4ffc8d..97f2f5efc1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -13,6 +13,7 @@ import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.utils.MetricUtils; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; @@ -37,6 +38,13 @@ public StreamingQueryHandler( this.emrServerlessClient = emrServerlessClient; } + @Override + public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { + throw new IllegalArgumentException( + "can't cancel index DML query, using ALTER auto_refresh=off statement to stop job, using" + + " VACUUM statement to stop job and delete data"); + } + @Override public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { @@ -77,6 +85,9 @@ public DispatchQueryResponse submit( AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), jobId, dataSourceMetadata.getResultIndex(), - null); + null, + dataSourceMetadata.getName(), + JobType.STREAMING, + indexQueryDetails.openSearchIndexName()); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java index b20648cdfd..2c39aab1d4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java @@ -1,14 +1,37 @@ package org.opensearch.sql.spark.dispatcher.model; -import lombok.AllArgsConstructor; -import lombok.Data; +import lombok.Getter; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; -@Data -@AllArgsConstructor +@Getter public class DispatchQueryResponse { - private AsyncQueryId queryId; - private String jobId; - private String resultIndex; - private String sessionId; + private final AsyncQueryId queryId; + private final String jobId; + private final String resultIndex; + private final String sessionId; + private final String datasourceName; + private final JobType jobType; + private final String indexName; + + public DispatchQueryResponse( + AsyncQueryId queryId, String jobId, String resultIndex, String sessionId) { + this(queryId, jobId, resultIndex, sessionId, null, JobType.INTERACTIVE, null); + } + + public DispatchQueryResponse( + AsyncQueryId queryId, + String jobId, + String resultIndex, + String sessionId, + String datasourceName, + JobType jobType, + String indexName) { + this.queryId = queryId; + this.jobId = jobId; + this.resultIndex = resultIndex; + this.sessionId = sessionId; + this.datasourceName = datasourceName; + this.jobType = jobType; + this.indexName = indexName; + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java index d4a8e7ddbf..8833665570 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java @@ -12,4 +12,12 @@ public interface FlintIndexMetadataReader { * @return FlintIndexMetadata. */ FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails); + + /** + * Given Index name, get the streaming job Id. + * + * @param indexName indexName. + * @return FlintIndexMetadata. + */ + FlintIndexMetadata getFlintIndexMetadata(String indexName); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java index a16d0b9138..d6e07fba8a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java @@ -15,7 +15,11 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader { @Override public FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails) { - String indexName = indexQueryDetails.openSearchIndexName(); + return getFlintIndexMetadata(indexQueryDetails.openSearchIndexName()); + } + + @Override + public FlintIndexMetadata getFlintIndexMetadata(String indexName) { GetMappingsResponse mappingsResponse = client.admin().indices().prepareGetMappings(indexName).get(); try { diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index e176a2b828..725080bbcd 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -391,6 +391,7 @@ void assertState(FlintIndexState expected) { @RequiredArgsConstructor public class FlintDatasetMock { final String query; + final String refreshQuery; final FlintIndexType indexType; final String indexName; boolean isLegacy = false; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index ab6439492a..4ec5d4d80b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -43,6 +43,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { private final FlintDatasetMock mockIndex = new FlintDatasetMock( "DROP SKIPPING INDEX ON mys3.default.http_logs", + "REFRESH SKIPPING INDEX ON mys3.default.http_logs", FlintIndexType.SKIPPING, "flint_mys3_default_http_logs_skipping_index") .latestId("skippingindexid"); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 844567f4f5..9ba15c250e 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -22,40 +22,58 @@ import org.opensearch.sql.spark.rest.model.LangType; public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { + public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs"; + public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs"; + public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mv"; public final FlintDatasetMock LEGACY_SKIPPING = new FlintDatasetMock( "DROP SKIPPING INDEX ON mys3.default.http_logs", + REFRESH_SI, FlintIndexType.SKIPPING, "flint_mys3_default_http_logs_skipping_index") .isLegacy(true); public final FlintDatasetMock LEGACY_COVERING = new FlintDatasetMock( "DROP INDEX covering ON mys3.default.http_logs", + REFRESH_CI, FlintIndexType.COVERING, "flint_mys3_default_http_logs_covering_index") .isLegacy(true); public final FlintDatasetMock LEGACY_MV = new FlintDatasetMock( - "DROP MATERIALIZED VIEW mv", FlintIndexType.MATERIALIZED_VIEW, "flint_mv") + "DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv") .isLegacy(true); public final FlintDatasetMock SKIPPING = new FlintDatasetMock( "DROP SKIPPING INDEX ON mys3.default.http_logs", + REFRESH_SI, FlintIndexType.SKIPPING, "flint_mys3_default_http_logs_skipping_index") .latestId("skippingindexid"); public final FlintDatasetMock COVERING = new FlintDatasetMock( "DROP INDEX covering ON mys3.default.http_logs", + REFRESH_CI, FlintIndexType.COVERING, "flint_mys3_default_http_logs_covering_index") .latestId("coveringid"); public final FlintDatasetMock MV = new FlintDatasetMock( - "DROP MATERIALIZED VIEW mv", FlintIndexType.MATERIALIZED_VIEW, "flint_mv") + "DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv") .latestId("mvid"); + public final String CREATE_SI_AUTO = + "CREATE SKIPPING INDEX ON mys3.default.http_logs" + + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)"; + + public final String CREATE_CI_AUTO = + "CREATE INDEX covering ON mys3.default.http_logs " + + "(l_orderkey, l_quantity) WITH (auto_refresh = true)"; + + public final String CREATE_MV_AUTO = + "CREATE MATERIALIZED VIEW mv AS select * " + + "from mys3.default.https WITH (auto_refresh = true)"; /** * Happy case. expectation is @@ -762,4 +780,89 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); assertNotNull(asyncQueryResponse.getSessionId()); } + + /** Cancel create flint index statement with auto_refresh=true, should throw exception. */ + @Test + public void cancelAutoRefreshCreateFlintIndexShouldThrowException() { + ImmutableList.of(CREATE_SI_AUTO, CREATE_CI_AUTO, CREATE_MV_AUTO) + .forEach( + query -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + Assert.fail("should not call cancelJobRun"); + return null; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + Assert.fail("should not call getJobRunResult"); + return null; + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // 1. submit create / refresh index query + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + + System.out.println(query); + + // 2. cancel query + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); + assertEquals( + "can't cancel index DML query, using ALTER auto_refresh=off statement to stop" + + " job, using VACUUM statement to stop job and delete data", + exception.getMessage()); + }); + } + + /** Cancel REFRESH statement should success */ + @Test + public void cancelRefreshStatement() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService( + () -> + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult( + String applicationId, String jobId) { + return new GetJobRunResult() + .withJobRun(new JobRun().withState("Cancelled")); + } + }); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + + // 1. Submit REFRESH statement + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.refreshQuery, DATASOURCE, LangType.SQL, null)); + // mock index state. + flintIndexJob.refreshing(); + + // 2. Cancel query + String cancelResponse = asyncQueryExecutorService.cancelQuery(response.getQueryId()); + + assertNotNull(cancelResponse); + assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); + + // assert state is active + flintIndexJob.assertState(FlintIndexState.ACTIVE); + }); + } }