From a3d34d048944f262989e22bc219661f85fa04e15 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 30 Oct 2023 21:23:53 +0000 Subject: [PATCH] Redefine Drop Index as logical delete (#2386) * Redefine Drop Index as logical delete Signed-off-by: Peng Huo * merge from 2.x Signed-off-by: Peng Huo * add refresh_job limit and disable batch query Signed-off-by: Peng Huo * update doc Signed-off-by: Peng Huo --------- Signed-off-by: Peng Huo (cherry picked from commit cb8d95346cb5348b670d43991a0bb41857e8e533) Signed-off-by: github-actions[bot] --- .../sql/common/setting/Settings.java | 9 +- docs/user/admin/settings.rst | 26 +- .../setting/OpenSearchSettings.java | 28 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 3 +- spark/build.gradle | 1 + .../AsyncQueryExecutorServiceImpl.java | 1 - .../model/AsyncQueryJobMetadata.java | 17 +- .../spark/dispatcher/AsyncQueryHandler.java | 5 - .../spark/dispatcher/BatchQueryHandler.java | 7 +- .../sql/spark/dispatcher/IndexDMLHandler.java | 116 +++ .../dispatcher/InteractiveQueryHandler.java | 1 - .../dispatcher/SparkQueryDispatcher.java | 125 +-- .../dispatcher/StreamingQueryHandler.java | 11 +- .../model/DispatchQueryResponse.java | 1 - .../dispatcher/model/IndexDMLResult.java | 74 ++ .../execution/session/SessionManager.java | 4 +- .../execution/statestore/StateModel.java | 4 + .../execution/statestore/StateStore.java | 70 +- .../sql/spark/flint/FlintIndexMetadata.java | 14 +- .../sql/spark/flint/FlintIndexState.java | 54 ++ .../sql/spark/flint/FlintIndexStateModel.java | 150 ++++ .../spark/flint/operation/FlintIndexOp.java | 111 +++ .../flint/operation/FlintIndexOpCancel.java | 76 ++ .../flint/operation/FlintIndexOpDelete.java | 39 + ...DefaultSparkSqlFunctionResponseHandle.java | 1 - .../leasemanager/DefaultLeaseManager.java | 42 +- ...AsyncQueryExecutorServiceImplSpecTest.java | 277 +----- .../AsyncQueryExecutorServiceImplTest.java | 4 +- .../AsyncQueryExecutorServiceSpec.java | 291 +++++++ .../spark/asyncquery/IndexQuerySpecTest.java | 793 ++++++++++++++++++ ...yncQueryJobMetadataStorageServiceTest.java | 4 +- .../spark/dispatcher/DropIndexResultTest.java | 51 -- .../spark/dispatcher/IndexDMLHandlerTest.java | 21 + .../dispatcher/SparkQueryDispatcherTest.java | 275 +----- .../execution/session/SessionManagerTest.java | 3 - .../sql/spark/flint/FlintIndexStateTest.java | 18 + .../flint/operation/FlintIndexOpTest.java | 61 ++ .../leasemanager/DefaultLeaseManagerTest.java | 16 +- .../0.1.1/flint_covering_index.json | 37 + .../flint-index-mappings/0.1.1/flint_mv.json | 30 + .../0.1.1/flint_skipping_index.json | 23 + .../flint_covering_index.json | 36 + .../flint-index-mappings/flint_mv.json | 42 + .../flint_skipping_index.json | 22 + .../query_execution_result_mapping.json | 44 + 45 files changed, 2269 insertions(+), 769 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java create mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json create mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_covering_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_mv.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_skipping_index.json create mode 100644 spark/src/test/resources/query_execution_result_mapping.json diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 61d23a1a34..c9e348dbd4 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -5,8 +5,6 @@ package org.opensearch.sql.common.setting; -import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED; - import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import java.util.List; @@ -40,8 +38,8 @@ public enum Key { METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"), SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"), CLUSTER_NAME("cluster.name"), - SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"), SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"), + SPARK_EXECUTION_REFRESH_JOB_LIMIT("plugins.query.executionengine.spark.refresh_job.limit"), SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"), RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"), AUTO_INDEX_MANAGEMENT_ENABLED( @@ -69,9 +67,4 @@ public static Optional of(String keyValue) { public abstract T getSettingValue(Key key); public abstract List getSettings(); - - /** Helper class */ - public static boolean isSparkExecutionSessionEnabled(Settings settings) { - return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED); - } } diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 3acb005c12..f3e8070a23 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -311,15 +311,16 @@ SQL query:: "status": 400 } -plugins.query.executionengine.spark.session.enabled -=================================================== + +plugins.query.executionengine.spark.session.limit +================================================== Description ----------- -By default, execution engine is executed in session mode. You can disable session mode by this setting. +Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting. -1. The default value is true. +1. The default value is 100. 2. This setting is node scope. 3. This setting can be updated dynamically. @@ -328,7 +329,7 @@ You can update the setting with a new value like this. SQL query:: sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \ - ... -d '{"transient":{"plugins.query.executionengine.spark.session.enabled":"false"}}' + ... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}' { "acknowledged": true, "persistent": {}, @@ -338,7 +339,7 @@ SQL query:: "executionengine": { "spark": { "session": { - "enabled": "false" + "limit": "200" } } } @@ -347,15 +348,16 @@ SQL query:: } } -plugins.query.executionengine.spark.session.limit -================================================== + +plugins.query.executionengine.spark.refresh_job.limit +===================================================== Description ----------- -Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting. +Each cluster can have maximum 20 datasources. You can increase limit by this setting. -1. The default value is 100. +1. The default value is 20. 2. This setting is node scope. 3. This setting can be updated dynamically. @@ -364,7 +366,7 @@ You can update the setting with a new value like this. SQL query:: sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \ - ... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}' + ... -d '{"transient":{"plugins.query.executionengine.spark.refresh_job.limit":200}}' { "acknowledged": true, "persistent": {}, @@ -373,7 +375,7 @@ SQL query:: "query": { "executionengine": { "spark": { - "session": { + "refresh_job": { "limit": "200" } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 6b5f3cf0f1..02b28d58ce 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -137,13 +137,6 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); - public static final Setting SPARK_EXECUTION_SESSION_ENABLED_SETTING = - Setting.boolSetting( - Key.SPARK_EXECUTION_SESSION_ENABLED.getKeyValue(), - true, - Setting.Property.NodeScope, - Setting.Property.Dynamic); - public static final Setting SPARK_EXECUTION_SESSION_LIMIT_SETTING = Setting.intSetting( Key.SPARK_EXECUTION_SESSION_LIMIT.getKeyValue(), @@ -151,6 +144,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING = + Setting.intSetting( + Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT.getKeyValue(), + 50, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting SESSION_INDEX_TTL_SETTING = Setting.positiveTimeSetting( Key.SESSION_INDEX_TTL.getKeyValue(), @@ -249,18 +249,18 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SPARK_EXECUTION_ENGINE_CONFIG, SPARK_EXECUTION_ENGINE_CONFIG, new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG)); - register( - settingBuilder, - clusterSettings, - Key.SPARK_EXECUTION_SESSION_ENABLED, - SPARK_EXECUTION_SESSION_ENABLED_SETTING, - new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED)); register( settingBuilder, clusterSettings, Key.SPARK_EXECUTION_SESSION_LIMIT, SPARK_EXECUTION_SESSION_LIMIT_SETTING, new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT)); + register( + settingBuilder, + clusterSettings, + Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT, + SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING, + new Updater(Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT)); register( settingBuilder, clusterSettings, @@ -350,8 +350,8 @@ public static List> pluginSettings() { .add(METRICS_ROLLING_INTERVAL_SETTING) .add(DATASOURCE_URI_HOSTS_DENY_LIST) .add(SPARK_EXECUTION_ENGINE_CONFIG) - .add(SPARK_EXECUTION_SESSION_ENABLED_SETTING) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) + .add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING) .add(SESSION_INDEX_TTL_SETTING) .add(RESULT_INDEX_TTL_SETTING) .add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 9d37fe28d0..63c07de032 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -335,7 +335,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService( new FlintIndexMetadataReaderImpl(client), client, new SessionManager(stateStore, emrServerlessClient, pluginSettings), - new DefaultLeaseManager(pluginSettings, stateStore)); + new DefaultLeaseManager(pluginSettings, stateStore), + stateStore); return new AsyncQueryExecutorServiceImpl( asyncQueryJobMetadataStorageService, sparkQueryDispatcher, diff --git a/spark/build.gradle b/spark/build.gradle index ed91b9820b..d703f6b24d 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -123,6 +123,7 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.execution.session.SessionModel', 'org.opensearch.sql.spark.execution.statement.StatementModel', + 'org.opensearch.sql.spark.flint.FlintIndexStateModel', // TODO: add tests for purging flint indices 'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*', 'org.opensearch.sql.spark.cluster.FlintIndexRetention', diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java index 18ae47c2b9..1c0979dffb 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java @@ -72,7 +72,6 @@ public CreateAsyncQueryResponse createAsyncQuery( dispatchQueryResponse.getQueryId(), sparkExecutionEngineConfig.getApplicationId(), dispatchQueryResponse.getJobId(), - dispatchQueryResponse.isDropIndexQuery(), dispatchQueryResponse.getResultIndex(), dispatchQueryResponse.getSessionId())); return new CreateAsyncQueryResponse( diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index 3c59403661..d1357f364d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -29,7 +29,6 @@ public class AsyncQueryJobMetadata extends StateModel { private final AsyncQueryId queryId; private final String applicationId; private final String jobId; - private final boolean isDropIndexQuery; private final String resultIndex; // optional sessionId. private final String sessionId; @@ -43,7 +42,6 @@ public AsyncQueryJobMetadata( queryId, applicationId, jobId, - false, resultIndex, null, SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -54,14 +52,12 @@ public AsyncQueryJobMetadata( AsyncQueryId queryId, String applicationId, String jobId, - boolean isDropIndexQuery, String resultIndex, String sessionId) { this( queryId, applicationId, jobId, - isDropIndexQuery, resultIndex, sessionId, SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -72,7 +68,6 @@ public AsyncQueryJobMetadata( AsyncQueryId queryId, String applicationId, String jobId, - boolean isDropIndexQuery, String resultIndex, String sessionId, long seqNo, @@ -80,7 +75,6 @@ public AsyncQueryJobMetadata( this.queryId = queryId; this.applicationId = applicationId; this.jobId = jobId; - this.isDropIndexQuery = isDropIndexQuery; this.resultIndex = resultIndex; this.sessionId = sessionId; this.seqNo = seqNo; @@ -106,7 +100,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field("type", TYPE_JOBMETA) .field("jobId", jobId) .field("applicationId", applicationId) - .field("isDropIndexQuery", isDropIndexQuery) .field("resultIndex", resultIndex) .field("sessionId", sessionId) .endObject(); @@ -120,7 +113,6 @@ public static AsyncQueryJobMetadata copy( copy.getQueryId(), copy.getApplicationId(), copy.getJobId(), - copy.isDropIndexQuery(), copy.getResultIndex(), copy.getSessionId(), seqNo, @@ -176,14 +168,7 @@ public static AsyncQueryJobMetadata fromXContent( throw new IllegalArgumentException("jobId and applicationId are required fields."); } return new AsyncQueryJobMetadata( - queryId, - applicationId, - jobId, - isDropIndexQuery, - resultIndex, - sessionId, - seqNo, - primaryTerm); + queryId, applicationId, jobId, resultIndex, sessionId, seqNo, primaryTerm); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java index 2823e64af7..b3d2cdd289 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java @@ -20,11 +20,6 @@ public abstract class AsyncQueryHandler { public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { - if (asyncQueryJobMetadata.isDropIndexQuery()) { - return SparkQueryDispatcher.DropIndexResult.fromJobId(asyncQueryJobMetadata.getJobId()) - .result(); - } - JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata); if (result.has(DATA_FIELD)) { JSONObject items = result.getJSONObject(DATA_FIELD); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index c6bac9b288..9e59fb707c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -22,12 +22,15 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.JobType; +import org.opensearch.sql.spark.leasemanager.LeaseManager; +import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { private final EMRServerlessClient emrServerlessClient; private final JobExecutionResponseReader jobExecutionResponseReader; + protected final LeaseManager leaseManager; @Override protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { @@ -60,6 +63,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { @Override public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { + leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource())); + String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query"; Map tags = context.getTags(); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); @@ -81,6 +86,6 @@ public DispatchQueryResponse submit( dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); return new DispatchQueryResponse( - context.getQueryId(), jobId, false, dataSourceMetadata.getResultIndex(), null); + context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java new file mode 100644 index 0000000000..3ab5439ad5 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher; + +import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; + +import com.amazonaws.services.emrserverless.model.JobRunState; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.opensearch.client.Client; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.operation.FlintIndexOp; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; + +/** Handle Index DML query. includes * DROP * ALT? */ +@RequiredArgsConstructor +public class IndexDMLHandler extends AsyncQueryHandler { + private static final Logger LOG = LogManager.getLogger(); + + public static final String DROP_INDEX_JOB_ID = "dropIndexJobId"; + + private final EMRServerlessClient emrServerlessClient; + + private final DataSourceService dataSourceService; + + private final DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; + + private final JobExecutionResponseReader jobExecutionResponseReader; + + private final FlintIndexMetadataReader flintIndexMetadataReader; + + private final Client client; + + private final StateStore stateStore; + + public static boolean isIndexDMLQuery(String jobId) { + return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId); + } + + @Override + public DispatchQueryResponse submit( + DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { + DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); + IndexQueryDetails indexDetails = context.getIndexQueryDetails(); + FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); + // if index is created without auto refresh. there is no job to cancel. + String status = JobRunState.FAILED.toString(); + String error = ""; + long startTime = 0L; + try { + FlintIndexOp jobCancelOp = + new FlintIndexOpCancel( + stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); + jobCancelOp.apply(indexMetadata); + + FlintIndexOp indexDeleteOp = + new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource()); + indexDeleteOp.apply(indexMetadata); + status = JobRunState.SUCCESS.toString(); + } catch (Exception e) { + error = e.getMessage(); + LOG.error(e); + } + + AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()); + IndexDMLResult indexDMLResult = + new IndexDMLResult( + asyncQueryId.getId(), + status, + error, + dispatchQueryRequest.getDatasource(), + System.currentTimeMillis() - startTime, + System.currentTimeMillis()); + String resultIndex = dataSourceMetadata.getResultIndex(); + createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult); + + return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null); + } + + @Override + protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) { + String queryId = asyncQueryJobMetadata.getQueryId().getId(); + return jobExecutionResponseReader.getResultWithQueryId( + queryId, asyncQueryJobMetadata.getResultIndex()); + } + + @Override + protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) { + throw new IllegalStateException("[BUG] can't fetch result of index DML query form server"); + } + + @Override + public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { + throw new IllegalArgumentException("can't cancel index DML query"); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index d75f568275..d6ca83e52a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -109,7 +109,6 @@ public DispatchQueryResponse submit( return new DispatchQueryResponse( context.getQueryId(), session.getSessionModel().getJobId(), - false, dataSourceMetadata.getResultIndex(), session.getSessionId().getSessionId()); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index a800e45dd6..0aa183335e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -5,26 +5,12 @@ package org.opensearch.sql.spark.dispatcher; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; - -import com.amazonaws.services.emrserverless.model.JobRunState; -import java.nio.charset.StandardCharsets; -import java.util.Base64; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.json.JSONArray; import org.json.JSONObject; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -38,7 +24,7 @@ import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.execution.session.SessionManager; -import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -71,6 +57,8 @@ public class SparkQueryDispatcher { private LeaseManager leaseManager; + private StateStore stateStore; + public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) { DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); @@ -79,7 +67,7 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) AsyncQueryHandler asyncQueryHandler = sessionManager.isEnabled() ? new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager) - : new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader); + : new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); DispatchQueryContext.DispatchQueryContextBuilder contextBuilder = DispatchQueryContext.builder() .dataSourceMetadata(dataSourceMetadata) @@ -95,15 +83,16 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) contextBuilder.indexQueryDetails(indexQueryDetails); if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) { - // todo, fix in DROP INDEX PR. - return handleDropIndexQuery(dispatchQueryRequest, indexQueryDetails); + asyncQueryHandler = createIndexDMLHandler(); } else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType()) && indexQueryDetails.isAutoRefresh()) { asyncQueryHandler = - new StreamingQueryHandler(emrServerlessClient, jobExecutionResponseReader); + new StreamingQueryHandler( + emrServerlessClient, jobExecutionResponseReader, leaseManager); } else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) { // manual refresh should be handled by batch handler - asyncQueryHandler = new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader); + asyncQueryHandler = + new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); } } return asyncQueryHandler.submit(dispatchQueryRequest, contextBuilder.build()); @@ -113,20 +102,37 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) if (asyncQueryJobMetadata.getSessionId() != null) { return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager) .getQueryResponse(asyncQueryJobMetadata); + } else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) { + return createIndexDMLHandler().getQueryResponse(asyncQueryJobMetadata); } else { - return new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader) + return new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager) .getQueryResponse(asyncQueryJobMetadata); } } public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { + AsyncQueryHandler queryHandler; if (asyncQueryJobMetadata.getSessionId() != null) { - return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager) - .cancelJob(asyncQueryJobMetadata); + queryHandler = + new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager); + } else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) { + queryHandler = createIndexDMLHandler(); } else { - return new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader) - .cancelJob(asyncQueryJobMetadata); + queryHandler = + new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager); } + return queryHandler.cancelJob(asyncQueryJobMetadata); + } + + private IndexDMLHandler createIndexDMLHandler() { + return new IndexDMLHandler( + emrServerlessClient, + dataSourceService, + dataSourceUserAuthorizationHelper, + jobExecutionResponseReader, + flintIndexMetadataReader, + client, + stateStore); } // TODO: Revisit this logic. @@ -143,40 +149,6 @@ private static void fillMissingDetails( } } - private DispatchQueryResponse handleDropIndexQuery( - DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) { - DataSourceMetadata dataSourceMetadata = - this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); - FlintIndexMetadata indexMetadata = - flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails); - // if index is created without auto refresh. there is no job to cancel. - String status = JobRunState.FAILED.toString(); - try { - if (indexMetadata.isAutoRefresh()) { - emrServerlessClient.cancelJobRun( - dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId()); - } - } finally { - String indexName = indexQueryDetails.openSearchIndexName(); - try { - AcknowledgedResponse response = - client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); - if (!response.isAcknowledged()) { - LOG.error("failed to delete index"); - } - status = JobRunState.SUCCESS.toString(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("failed to delete index"); - } - } - return new DispatchQueryResponse( - AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), - new DropIndexResult(status).toJobId(), - true, - dataSourceMetadata.getResultIndex(), - null); - } - private static Map getDefaultTagsForJobSubmission( DispatchQueryRequest dispatchQueryRequest) { Map tags = new HashMap<>(); @@ -184,39 +156,4 @@ private static Map getDefaultTagsForJobSubmission( tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource()); return tags; } - - @Getter - @RequiredArgsConstructor - public static class DropIndexResult { - private static final int PREFIX_LEN = 10; - - private final String status; - - public static DropIndexResult fromJobId(String jobId) { - String status = new String(Base64.getDecoder().decode(jobId)).substring(PREFIX_LEN); - return new DropIndexResult(status); - } - - public String toJobId() { - String queryId = RandomStringUtils.randomAlphanumeric(PREFIX_LEN) + status; - return Base64.getEncoder().encodeToString(queryId.getBytes(StandardCharsets.UTF_8)); - } - - public JSONObject result() { - JSONObject result = new JSONObject(); - if (JobRunState.SUCCESS.toString().equalsIgnoreCase(status)) { - result.put(STATUS_FIELD, status); - // todo. refactor response handling. - JSONObject dummyData = new JSONObject(); - dummyData.put("result", new JSONArray()); - dummyData.put("schema", new JSONArray()); - dummyData.put("applicationId", "fakeDropIndexApplicationId"); - result.put(DATA_FIELD, dummyData); - } else { - result.put(STATUS_FIELD, status); - result.put(ERROR_FIELD, "failed to drop index"); - } - return result; - } - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 81c3438532..ac5c878c28 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -19,6 +19,8 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.dispatcher.model.JobType; +import org.opensearch.sql.spark.leasemanager.LeaseManager; +import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** Handle Streaming Query. */ @@ -27,14 +29,18 @@ public class StreamingQueryHandler extends BatchQueryHandler { public StreamingQueryHandler( EMRServerlessClient emrServerlessClient, - JobExecutionResponseReader jobExecutionResponseReader) { - super(emrServerlessClient, jobExecutionResponseReader); + JobExecutionResponseReader jobExecutionResponseReader, + LeaseManager leaseManager) { + super(emrServerlessClient, jobExecutionResponseReader, leaseManager); this.emrServerlessClient = emrServerlessClient; } @Override public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { + + leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource())); + String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails(); Map tags = context.getTags(); @@ -60,7 +66,6 @@ public DispatchQueryResponse submit( return new DispatchQueryResponse( AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), jobId, - false, dataSourceMetadata.getResultIndex(), null); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java index e44379daff..b20648cdfd 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java @@ -9,7 +9,6 @@ public class DispatchQueryResponse { private AsyncQueryId queryId; private String jobId; - private boolean isDropIndexQuery; private String resultIndex; private String sessionId; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java new file mode 100644 index 0000000000..b01ecf55ba --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher.model; + +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.sql.spark.execution.statestore.StateModel; + +/** Plugin create Index DML result. */ +@Data +@EqualsAndHashCode(callSuper = false) +public class IndexDMLResult extends StateModel { + private static final String QUERY_ID = "queryId"; + private static final String QUERY_RUNTIME = "queryRunTime"; + private static final String UPDATE_TIME = "updateTime"; + private static final String DOC_ID_PREFIX = "index"; + + private final String queryId; + private final String status; + private final String error; + private final String datasourceName; + private final Long queryRunTime; + private final Long updateTime; + + public static IndexDMLResult copy(IndexDMLResult copy, long seqNo, long primaryTerm) { + return new IndexDMLResult( + copy.queryId, + copy.status, + copy.error, + copy.datasourceName, + copy.queryRunTime, + copy.updateTime); + } + + @Override + public String getId() { + return DOC_ID_PREFIX + queryId; + } + + @Override + public long getSeqNo() { + return SequenceNumbers.UNASSIGNED_SEQ_NO; + } + + @Override + public long getPrimaryTerm() { + return SequenceNumbers.UNASSIGNED_PRIMARY_TERM; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder + .startObject() + .field(QUERY_ID, queryId) + .field("status", status) + .field("error", error) + .field(DATASOURCE_NAME, datasourceName) + .field(QUERY_RUNTIME, queryRunTime) + .field(UPDATE_TIME, updateTime) + .field("result", ImmutableList.of()) + .field("schema", ImmutableList.of()) + .endObject(); + return builder; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index c0f7bbcde8..0f0a4ce373 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -5,7 +5,6 @@ package org.opensearch.sql.spark.execution.session; -import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED; import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; import java.util.Optional; @@ -52,7 +51,8 @@ public Optional getSession(SessionId sid) { return Optional.empty(); } + // todo, keep it only for testing, will remove it later. public boolean isEnabled() { - return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED); + return true; } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java index b5bf31a6ba..fe105cc8e4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java @@ -9,6 +9,10 @@ import org.opensearch.core.xcontent.XContentParser; public abstract class StateModel implements ToXContentObject { + public static final String VERSION_1_0 = "1.0"; + public static final String TYPE = "type"; + public static final String STATE = "state"; + public static final String LAST_UPDATE_TIME = "lastUpdateTime"; public abstract String getId(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index 86d15a7036..f36cbba32c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -6,7 +6,9 @@ package org.opensearch.sql.spark.execution.statestore; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; +import static org.opensearch.sql.spark.execution.statestore.StateModel.STATE; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -45,11 +47,14 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.session.SessionType; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; /** * State Store maintain the state of Session and Statement. State State create/update/get doc on @@ -70,7 +75,8 @@ public class StateStore { private final Client client; private final ClusterService clusterService; - protected T create( + @VisibleForTesting + public T create( T st, StateModel.CopyBuilder builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { @@ -104,7 +110,8 @@ protected T create( } } - protected Optional get( + @VisibleForTesting + public Optional get( String sid, StateModel.FromXContent builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { @@ -135,7 +142,8 @@ protected Optional get( } } - protected T updateState( + @VisibleForTesting + public T updateState( T st, S state, StateModel.StateCopyBuilder builder, String indexName) { try { T model = builder.of(st, state, st.getSeqNo(), st.getPrimaryTerm()); @@ -151,18 +159,8 @@ protected T updateState( try (ThreadContext.StoredContext ignored = client.threadPool().getThreadContext().stashContext()) { UpdateResponse updateResponse = client.update(updateRequest).actionGet(); - if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { - LOG.debug("Successfully update doc. id: {}", st.getId()); - return builder.of( - model, state, updateResponse.getSeqNo(), updateResponse.getPrimaryTerm()); - } else { - throw new RuntimeException( - String.format( - Locale.ROOT, - "Failed update doc. id: %s, error: %s", - st.getId(), - updateResponse.getResult().getLowercase())); - } + LOG.debug("Successfully update doc. id: {}", st.getId()); + return builder.of(model, state, updateResponse.getSeqNo(), updateResponse.getPrimaryTerm()); } } catch (IOException e) { throw new RuntimeException(e); @@ -303,4 +301,46 @@ public static Supplier activeSessionsCount(StateStore stateStore, String d QueryBuilders.termQuery( SessionModel.SESSION_STATE, SessionState.RUNNING.getSessionState()))); } + + public static BiFunction + updateFlintIndexState(StateStore stateStore, String datasourceName) { + return (old, state) -> + stateStore.updateState( + old, + state, + FlintIndexStateModel::copyWithState, + DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + } + + public static Function> getFlintIndexState( + StateStore stateStore, String datasourceName) { + return (docId) -> + stateStore.get( + docId, + FlintIndexStateModel::fromXContent, + DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + } + + public static Function createFlintIndexState( + StateStore stateStore, String datasourceName) { + return (st) -> + stateStore.create( + st, FlintIndexStateModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + } + + public static Function createIndexDMLResult( + StateStore stateStore, String indexName) { + return (result) -> stateStore.create(result, IndexDMLResult::copy, indexName); + } + + public static Supplier activeRefreshJobCount(StateStore stateStore, String datasourceName) { + return () -> + stateStore.count( + DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), + QueryBuilders.boolQuery() + .must( + QueryBuilders.termQuery( + SessionModel.TYPE, FlintIndexStateModel.FLINT_INDEX_DOC_TYPE)) + .must(QueryBuilders.termQuery(STATE, FlintIndexState.REFRESHING.getState()))); + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java index 81b7fa1693..1721263bf8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java @@ -7,6 +7,7 @@ import java.util.Locale; import java.util.Map; +import java.util.Optional; import lombok.Data; @Data @@ -19,8 +20,13 @@ public class FlintIndexMetadata { public static final String AUTO_REFRESH = "auto_refresh"; public static final String AUTO_REFRESH_DEFAULT = "false"; + public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID"; + public static final String FLINT_INDEX_STATE_DOC_ID = "latestId"; + private final String jobId; private final boolean autoRefresh; + private final String appId; + private final String latestId; public static FlintIndexMetadata fromMetatdata(Map metaMap) { Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); @@ -32,6 +38,12 @@ public static FlintIndexMetadata fromMetatdata(Map metaMap) { !((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT)) .toLowerCase(Locale.ROOT) .equalsIgnoreCase(AUTO_REFRESH_DEFAULT); - return new FlintIndexMetadata(jobId, autoRefresh); + String appId = (String) envMap.getOrDefault(APP_ID, null); + String latestId = (String) metaMap.getOrDefault(FLINT_INDEX_STATE_DOC_ID, null); + return new FlintIndexMetadata(jobId, autoRefresh, appId, latestId); + } + + public Optional getLatestId() { + return Optional.ofNullable(latestId); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java new file mode 100644 index 0000000000..0ab4d92c17 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; + +/** Flint index state. */ +@Getter +public enum FlintIndexState { + // stable state + EMPTY("empty"), + // transitioning state + CREATING("creating"), + // transitioning state + REFRESHING("refreshing"), + // transitioning state + CANCELLING("cancelling"), + // stable state + ACTIVE("active"), + // transitioning state + DELETING("deleting"), + // stable state + DELETED("deleted"), + // stable state + FAILED("failed"), + // unknown state, if some state update in Spark side, not reflect in here. + UNKNOWN("unknown"); + + private final String state; + + FlintIndexState(String state) { + this.state = state; + } + + private static Map STATES = + Arrays.stream(FlintIndexState.values()) + .collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t)); + + public static FlintIndexState fromString(String key) { + for (FlintIndexState ss : FlintIndexState.values()) { + if (ss.getState().toLowerCase(Locale.ROOT).equals(key)) { + return ss; + } + } + return UNKNOWN; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java new file mode 100644 index 0000000000..bb73f439a2 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java @@ -0,0 +1,150 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; +import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; + +import java.io.IOException; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.SneakyThrows; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.execution.statestore.StateModel; + +/** Flint Index Model maintain the index state. */ +@Getter +@Builder +@EqualsAndHashCode(callSuper = false) +public class FlintIndexStateModel extends StateModel { + public static final String FLINT_INDEX_DOC_TYPE = "flintindexstate"; + public static final String LATEST_ID = "latestId"; + public static final String DOC_ID_PREFIX = "flint"; + + private final FlintIndexState indexState; + private final String applicationId; + private final String jobId; + private final String latestId; + private final String datasourceName; + private final long lastUpdateTime; + private final String error; + + @EqualsAndHashCode.Exclude private final long seqNo; + @EqualsAndHashCode.Exclude private final long primaryTerm; + + public FlintIndexStateModel( + FlintIndexState indexState, + String applicationId, + String jobId, + String latestId, + String datasourceName, + long lastUpdateTime, + String error, + long seqNo, + long primaryTerm) { + this.indexState = indexState; + this.applicationId = applicationId; + this.jobId = jobId; + this.latestId = latestId; + this.datasourceName = datasourceName; + this.lastUpdateTime = lastUpdateTime; + this.error = error; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + public static FlintIndexStateModel copy(FlintIndexStateModel copy, long seqNo, long primaryTerm) { + return new FlintIndexStateModel( + copy.indexState, + copy.applicationId, + copy.jobId, + copy.latestId, + copy.datasourceName, + copy.lastUpdateTime, + copy.error, + seqNo, + primaryTerm); + } + + public static FlintIndexStateModel copyWithState( + FlintIndexStateModel copy, FlintIndexState state, long seqNo, long primaryTerm) { + return new FlintIndexStateModel( + state, + copy.applicationId, + copy.jobId, + copy.latestId, + copy.datasourceName, + copy.lastUpdateTime, + copy.error, + seqNo, + primaryTerm); + } + + @SneakyThrows + public static FlintIndexStateModel fromXContent( + XContentParser parser, long seqNo, long primaryTerm) { + FlintIndexStateModelBuilder builder = FlintIndexStateModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case STATE: + builder.indexState(FlintIndexState.fromString(parser.text())); + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LATEST_ID: + builder.latestId(parser.text()); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case ERROR: + builder.error(parser.text()); + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } + + @Override + public String getId() { + return latestId; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder + .startObject() + .field(VERSION, VERSION_1_0) + .field(TYPE, FLINT_INDEX_DOC_TYPE) + .field(STATE, indexState.getState()) + .field(APPLICATION_ID, applicationId) + .field(JOB_ID, jobId) + .field(LATEST_ID, latestId) + .field(DATASOURCE_NAME, datasourceName) + .field(LAST_UPDATE_TIME, lastUpdateTime) + .field(ERROR, error) + .endObject(); + return builder; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java new file mode 100644 index 0000000000..fb44b27568 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState; +import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState; + +import java.util.Locale; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +/** Flint Index Operation. */ +@RequiredArgsConstructor +public abstract class FlintIndexOp { + private static final Logger LOG = LogManager.getLogger(); + + private final StateStore stateStore; + private final String datasourceName; + + /** Apply operation on {@link FlintIndexMetadata} */ + public void apply(FlintIndexMetadata metadata) { + // todo, remove this logic after IndexState feature is enabled in Flint. + Optional latestId = metadata.getLatestId(); + if (latestId.isEmpty()) { + // take action without occ. + FlintIndexStateModel fakeModel = + new FlintIndexStateModel( + FlintIndexState.REFRESHING, + metadata.getAppId(), + metadata.getJobId(), + "", + datasourceName, + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + runOp(fakeModel); + } else { + Optional flintIndexOptional = + getFlintIndexState(stateStore, datasourceName).apply(latestId.get()); + if (flintIndexOptional.isEmpty()) { + String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId.get()); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + FlintIndexStateModel flintIndex = flintIndexOptional.get(); + + // 1.validate state. + FlintIndexState currentState = flintIndex.getIndexState(); + if (!validate(currentState)) { + String errorMsg = + String.format(Locale.ROOT, "validate failed. unexpected state: [%s]", currentState); + LOG.debug(errorMsg); + return; + } + + // 2.begin, move to transitioning state + FlintIndexState transitioningState = transitioningState(); + try { + flintIndex = + updateFlintIndexState(stateStore, datasourceName) + .apply(flintIndex, transitioningState()); + } catch (Exception e) { + String errorMsg = + String.format( + Locale.ROOT, "begin failed. target transitioning state: [%s]", transitioningState); + LOG.error(errorMsg, e); + throw new IllegalStateException(errorMsg, e); + } + + // 3.runOp + runOp(flintIndex); + + // 4.commit, move to stable state + FlintIndexState stableState = stableState(); + try { + updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState); + } catch (Exception e) { + String errorMsg = + String.format(Locale.ROOT, "commit failed. target stable state: [%s]", stableState); + LOG.error(errorMsg, e); + throw new IllegalStateException(errorMsg, e); + } + } + } + + /** + * Validate expected state. + * + *

return true if validate. + */ + abstract boolean validate(FlintIndexState state); + + /** get transitioningState */ + abstract FlintIndexState transitioningState(); + + abstract void runOp(FlintIndexStateModel flintIndex); + + /** get stableState */ + abstract FlintIndexState stableState(); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java new file mode 100644 index 0000000000..ba067e5c03 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.SneakyThrows; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +/** Cancel refreshing job. */ +public class FlintIndexOpCancel extends FlintIndexOp { + private static final Logger LOG = LogManager.getLogger(); + + private final EMRServerlessClient emrServerlessClient; + + public FlintIndexOpCancel( + StateStore stateStore, String datasourceName, EMRServerlessClient emrServerlessClient) { + super(stateStore, datasourceName); + this.emrServerlessClient = emrServerlessClient; + } + + public boolean validate(FlintIndexState state) { + return state == FlintIndexState.REFRESHING || state == FlintIndexState.CANCELLING; + } + + @Override + FlintIndexState transitioningState() { + return FlintIndexState.CANCELLING; + } + + /** cancel EMR-S job, wait cancelled state upto 15s. */ + @SneakyThrows + @Override + void runOp(FlintIndexStateModel flintIndexStateModel) { + String applicationId = flintIndexStateModel.getApplicationId(); + String jobId = flintIndexStateModel.getJobId(); + try { + emrServerlessClient.cancelJobRun( + flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId()); + } catch (IllegalArgumentException e) { + // handle job does not exist case. + LOG.error(e); + return; + } + + // pull job state until timeout or cancelled. + String jobRunState = ""; + int count = 3; + while (count-- != 0) { + jobRunState = + emrServerlessClient.getJobRunResult(applicationId, jobId).getJobRun().getState(); + if (jobRunState.equalsIgnoreCase("Cancelled")) { + break; + } + TimeUnit.SECONDS.sleep(1); + } + if (!jobRunState.equalsIgnoreCase("Cancelled")) { + String errMsg = "cancel job timeout"; + LOG.error(errMsg); + throw new TimeoutException(errMsg); + } + } + + @Override + FlintIndexState stableState() { + return FlintIndexState.ACTIVE; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java new file mode 100644 index 0000000000..d8b275c621 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +/** Flint Index Logical delete operation. Change state to DELETED. */ +public class FlintIndexOpDelete extends FlintIndexOp { + + public FlintIndexOpDelete(StateStore stateStore, String datasourceName) { + super(stateStore, datasourceName); + } + + public boolean validate(FlintIndexState state) { + return state == FlintIndexState.ACTIVE + || state == FlintIndexState.EMPTY + || state == FlintIndexState.DELETING; + } + + @Override + FlintIndexState transitioningState() { + return FlintIndexState.DELETING; + } + + @Override + void runOp(FlintIndexStateModel flintIndex) { + // logically delete, do nothing. + } + + @Override + FlintIndexState stableState() { + return FlintIndexState.DELETED; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java b/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java index 8fc417cd80..422d1caaf1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java +++ b/spark/src/main/java/org/opensearch/sql/spark/functions/response/DefaultSparkSqlFunctionResponseHandle.java @@ -48,7 +48,6 @@ private void constructIteratorAndSchema(JSONObject responseObject) { List result = new ArrayList<>(); List columnList; JSONObject items = responseObject.getJSONObject("data"); - logger.info("Spark Application ID: " + items.getString("applicationId")); columnList = getColumnList(items.getJSONArray("schema")); for (int i = 0; i < items.getJSONArray("result").length(); i++) { JSONObject row = diff --git a/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java b/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java index 1635a1801b..375fa7b11e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManager.java @@ -5,13 +5,17 @@ package org.opensearch.sql.spark.leasemanager; +import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_REFRESH_JOB_LIMIT; import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_LIMIT; import static org.opensearch.sql.spark.execution.statestore.StateStore.ALL_DATASOURCE; +import static org.opensearch.sql.spark.execution.statestore.StateStore.activeRefreshJobCount; import static org.opensearch.sql.spark.execution.statestore.StateStore.activeSessionsCount; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.function.Predicate; +import lombok.RequiredArgsConstructor; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateStore; @@ -32,7 +36,10 @@ public class DefaultLeaseManager implements LeaseManager { public DefaultLeaseManager(Settings settings, StateStore stateStore) { this.settings = settings; this.stateStore = stateStore; - this.concurrentLimitRules = Arrays.asList(new ConcurrentSessionRule()); + this.concurrentLimitRules = + Arrays.asList( + new ConcurrentSessionRule(settings, stateStore), + new ConcurrentRefreshJobRule(settings, stateStore)); } @Override @@ -48,10 +55,15 @@ interface Rule extends Predicate { String description(); } - public class ConcurrentSessionRule implements Rule { + @RequiredArgsConstructor + public static class ConcurrentSessionRule implements Rule { + private final Settings settings; + private final StateStore stateStore; + @Override public String description() { - return String.format("domain concurrent active session can not exceed %d", sessionMaxLimit()); + return String.format( + Locale.ROOT, "domain concurrent active session can not exceed %d", sessionMaxLimit()); } @Override @@ -66,4 +78,28 @@ public int sessionMaxLimit() { return settings.getSettingValue(SPARK_EXECUTION_SESSION_LIMIT); } } + + @RequiredArgsConstructor + public static class ConcurrentRefreshJobRule implements Rule { + private final Settings settings; + private final StateStore stateStore; + + @Override + public String description() { + return String.format( + Locale.ROOT, "domain concurrent refresh job can not exceed %d", refreshJobLimit()); + } + + @Override + public boolean test(LeaseRequest leaseRequest) { + if (leaseRequest.getJobType() == JobType.INTERACTIVE) { + return true; + } + return activeRefreshJobCount(stateStore, ALL_DATASOURCE).get() < refreshJobLimit(); + } + + public int refreshJobLimit() { + return settings.getSettingValue(SPARK_EXECUTION_REFRESH_JOB_LIMIT); + } + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 862da697d1..56ee56ea5e 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -5,7 +5,6 @@ package org.opensearch.sql.spark.asyncquery; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; @@ -14,172 +13,35 @@ import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_DOC_TYPE; import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; -import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession; import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; -import static org.opensearch.sql.spark.execution.statestore.StateStore.updateSessionState; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; -import com.amazonaws.services.emrserverless.model.CancelJobRunResult; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; -import com.amazonaws.services.emrserverless.model.JobRun; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import java.util.*; -import lombok.Getter; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import org.apache.commons.lang3.StringUtils; -import org.junit.After; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; +import org.junit.jupiter.api.Disabled; import org.opensearch.core.common.Strings; -import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; -import org.opensearch.plugins.Plugin; -import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; -import org.opensearch.sql.datasources.encryptor.EncryptorImpl; -import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; -import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; -import org.opensearch.sql.datasources.service.DataSourceServiceImpl; -import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage; -import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; -import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.client.StartJobRequest; -import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; -import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.execution.session.SessionId; -import org.opensearch.sql.spark.execution.session.SessionManager; -import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; -import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; import org.opensearch.sql.spark.rest.model.LangType; -import org.opensearch.sql.storage.DataSourceFactory; -import org.opensearch.test.OpenSearchIntegTestCase; - -public class AsyncQueryExecutorServiceImplSpecTest extends OpenSearchIntegTestCase { - public static final String DATASOURCE = "mys3"; - public static final String DSOTHER = "mytest"; - - private ClusterService clusterService; - private org.opensearch.sql.common.setting.Settings pluginSettings; - private NodeClient client; - private DataSourceServiceImpl dataSourceService; - private StateStore stateStore; - private ClusterSettings clusterSettings; - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(TestSettingPlugin.class); - } - public static class TestSettingPlugin extends Plugin { - @Override - public List> getSettings() { - return OpenSearchSettings.pluginSettings(); - } - } +public class AsyncQueryExecutorServiceImplSpecTest extends AsyncQueryExecutorServiceSpec { - @Before - public void setup() { - clusterService = clusterService(); - client = (NodeClient) cluster().client(); - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder() - .putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList()) - .build()) - .get(); - clusterSettings = clusterService.getClusterSettings(); - pluginSettings = new OpenSearchSettings(clusterSettings); - dataSourceService = createDataSourceService(); - DataSourceMetadata dm = - new DataSourceMetadata( - DATASOURCE, - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://localhost:9200", - "glue.indexstore.opensearch.auth", - "noauth"), - null); - dataSourceService.createDataSource(dm); - DataSourceMetadata otherDm = - new DataSourceMetadata( - DSOTHER, - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://localhost:9200", - "glue.indexstore.opensearch.auth", - "noauth"), - null); - dataSourceService.createDataSource(otherDm); - stateStore = new StateStore(client, clusterService); - createIndex(dm.fromNameToCustomResultIndex()); - createIndex(otherDm.fromNameToCustomResultIndex()); - } - - @After - public void clean() { - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().putNull(SPARK_EXECUTION_SESSION_ENABLED_SETTING.getKey()).build()) - .get(); - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) - .get(); - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().putNull(DATASOURCE_URI_HOSTS_DENY_LIST.getKey()).build()) - .get(); - } - - @Test + @Disabled("batch query is unsupported") public void withoutSessionCreateAsyncQueryThenGetResultThenCancel() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -207,7 +69,7 @@ public void withoutSessionCreateAsyncQueryThenGetResultThenCancel() { emrsClient.cancelJobRunCalled(1); } - @Test + @Disabled("batch query is unsupported") public void sessionLimitNotImpactBatchQuery() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -229,7 +91,7 @@ public void sessionLimitNotImpactBatchQuery() { emrsClient.startJobRunCalled(2); } - @Test + @Disabled("batch query is unsupported") public void createAsyncQueryCreateJobWithCorrectParameters() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -340,7 +202,7 @@ public void reuseSessionWhenCreateAsyncQuery() { assertEquals(second.getQueryId(), secondModel.get().getQueryId()); } - @Test + @Disabled("batch query is unsupported") public void batchQueryHasTimeout() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = @@ -602,125 +464,4 @@ public void concurrentSessionLimitIsDomainLevel() { new CreateAsyncQueryRequest("select 1", DSOTHER, LangType.SQL, null))); assertEquals("domain concurrent active session can not exceed 1", exception.getMessage()); } - - private DataSourceServiceImpl createDataSourceService() { - String masterKey = "a57d991d9b573f75b9bba1df"; - DataSourceMetadataStorage dataSourceMetadataStorage = - new OpenSearchDataSourceMetadataStorage( - client, clusterService, new EncryptorImpl(masterKey)); - return new DataSourceServiceImpl( - new ImmutableSet.Builder() - .add(new GlueDataSourceFactory(pluginSettings)) - .build(), - dataSourceMetadataStorage, - meta -> {}); - } - - private AsyncQueryExecutorService createAsyncQueryExecutorService( - EMRServerlessClient emrServerlessClient) { - StateStore stateStore = new StateStore(client, clusterService); - AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = - new OpensearchAsyncQueryJobMetadataStorageService(stateStore); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); - SparkQueryDispatcher sparkQueryDispatcher = - new SparkQueryDispatcher( - emrServerlessClient, - this.dataSourceService, - new DataSourceUserAuthorizationHelperImpl(client), - jobExecutionResponseReader, - new FlintIndexMetadataReaderImpl(client), - client, - new SessionManager(stateStore, emrServerlessClient, pluginSettings), - new DefaultLeaseManager(pluginSettings, stateStore)); - return new AsyncQueryExecutorServiceImpl( - asyncQueryJobMetadataStorageService, - sparkQueryDispatcher, - this::sparkExecutionEngineConfig); - } - - public static class LocalEMRSClient implements EMRServerlessClient { - - private int startJobRunCalled = 0; - private int cancelJobRunCalled = 0; - private int getJobResult = 0; - - @Getter private StartJobRequest jobRequest; - - @Override - public String startJobRun(StartJobRequest startJobRequest) { - jobRequest = startJobRequest; - startJobRunCalled++; - return "jobId"; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - getJobResult++; - JobRun jobRun = new JobRun(); - jobRun.setState("RUNNING"); - return new GetJobRunResult().withJobRun(jobRun); - } - - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - cancelJobRunCalled++; - return new CancelJobRunResult().withJobRunId(jobId); - } - - public void startJobRunCalled(int expectedTimes) { - assertEquals(expectedTimes, startJobRunCalled); - } - - public void cancelJobRunCalled(int expectedTimes) { - assertEquals(expectedTimes, cancelJobRunCalled); - } - - public void getJobRunResultCalled(int expectedTimes) { - assertEquals(expectedTimes, getJobResult); - } - } - - public SparkExecutionEngineConfig sparkExecutionEngineConfig() { - return new SparkExecutionEngineConfig("appId", "us-west-2", "roleArn", "", "myCluster"); - } - - public void enableSession(boolean enabled) { - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder() - .put(SPARK_EXECUTION_SESSION_ENABLED_SETTING.getKey(), enabled) - .build()) - .get(); - } - - public void setSessionLimit(long limit) { - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().put(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey(), limit).build()) - .get(); - } - - int search(QueryBuilder query) { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(query); - searchRequest.source(searchSourceBuilder); - SearchResponse searchResponse = client.search(searchRequest).actionGet(); - - return searchResponse.getHits().getHits().length; - } - - void setSessionState(String sessionId, SessionState sessionState) { - Optional model = getSession(stateStore, DATASOURCE).apply(sessionId); - SessionModel updated = - updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); - assertEquals(sessionState, updated.getSessionState()); - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java index 2ed316795f..efb965e9f3 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java @@ -81,7 +81,7 @@ void testCreateAsyncQuery() { LangType.SQL, "arn:aws:iam::270824043731:role/emr-job-execution-role", TEST_CLUSTER_NAME))) - .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, false, null, null)); + .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, null, null)); CreateAsyncQueryResponse createAsyncQueryResponse = jobExecutorService.createAsyncQuery(createAsyncQueryRequest); verify(asyncQueryJobMetadataStorageService, times(1)) @@ -111,7 +111,7 @@ void testCreateAsyncQueryWithExtraSparkSubmitParameter() { "--conf spark.dynamicAllocation.enabled=false", TEST_CLUSTER_NAME)); when(sparkQueryDispatcher.dispatch(any())) - .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, false, null, null)); + .thenReturn(new DispatchQueryResponse(QUERY_ID, EMR_JOB_ID, null, null)); jobExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java new file mode 100644 index 0000000000..35ec778c8e --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -0,0 +1,291 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery; + +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; +import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; +import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession; +import static org.opensearch.sql.spark.execution.statestore.StateStore.updateSessionState; + +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Resources; +import java.net.URL; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import lombok.Getter; +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; +import org.opensearch.sql.datasources.encryptor.EncryptorImpl; +import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; +import org.opensearch.sql.datasources.service.DataSourceServiceImpl; +import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.StartJobRequest; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; +import org.opensearch.sql.spark.execution.session.SessionManager; +import org.opensearch.sql.spark.execution.session.SessionModel; +import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; +import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.test.OpenSearchIntegTestCase; + +public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { + public static final String DATASOURCE = "mys3"; + public static final String DSOTHER = "mytest"; + + protected ClusterService clusterService; + protected org.opensearch.sql.common.setting.Settings pluginSettings; + protected NodeClient client; + protected DataSourceServiceImpl dataSourceService; + protected StateStore stateStore; + protected ClusterSettings clusterSettings; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(TestSettingPlugin.class); + } + + public static class TestSettingPlugin extends Plugin { + @Override + public List> getSettings() { + return OpenSearchSettings.pluginSettings(); + } + } + + @Before + public void setup() { + clusterService = clusterService(); + clusterSettings = clusterService.getClusterSettings(); + pluginSettings = new OpenSearchSettings(clusterSettings); + client = (NodeClient) cluster().client(); + dataSourceService = createDataSourceService(); + DataSourceMetadata dm = + new DataSourceMetadata( + DATASOURCE, + StringUtils.EMPTY, + DataSourceType.S3GLUE, + ImmutableList.of(), + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://localhost:9200", + "glue.indexstore.opensearch.auth", + "noauth"), + null); + dataSourceService.createDataSource(dm); + DataSourceMetadata otherDm = + new DataSourceMetadata( + DSOTHER, + StringUtils.EMPTY, + DataSourceType.S3GLUE, + ImmutableList.of(), + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://localhost:9200", + "glue.indexstore.opensearch.auth", + "noauth"), + null); + dataSourceService.createDataSource(otherDm); + stateStore = new StateStore(client, clusterService); + createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings()); + createIndexWithMappings(otherDm.getResultIndex(), loadResultIndexMappings()); + } + + @After + public void clean() { + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) + .get(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING.getKey()).build()) + .get(); + } + + private DataSourceServiceImpl createDataSourceService() { + String masterKey = "a57d991d9b573f75b9bba1df"; + DataSourceMetadataStorage dataSourceMetadataStorage = + new OpenSearchDataSourceMetadataStorage( + client, clusterService, new EncryptorImpl(masterKey)); + return new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new GlueDataSourceFactory(pluginSettings)) + .build(), + dataSourceMetadataStorage, + meta -> {}); + } + + protected AsyncQueryExecutorService createAsyncQueryExecutorService( + EMRServerlessClient emrServerlessClient) { + StateStore stateStore = new StateStore(client, clusterService); + AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = + new OpensearchAsyncQueryJobMetadataStorageService(stateStore); + JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + SparkQueryDispatcher sparkQueryDispatcher = + new SparkQueryDispatcher( + emrServerlessClient, + this.dataSourceService, + new DataSourceUserAuthorizationHelperImpl(client), + jobExecutionResponseReader, + new FlintIndexMetadataReaderImpl(client), + client, + new SessionManager(stateStore, emrServerlessClient, pluginSettings), + new DefaultLeaseManager(pluginSettings, stateStore), + stateStore); + return new AsyncQueryExecutorServiceImpl( + asyncQueryJobMetadataStorageService, + sparkQueryDispatcher, + this::sparkExecutionEngineConfig); + } + + public static class LocalEMRSClient implements EMRServerlessClient { + + private int startJobRunCalled = 0; + private int cancelJobRunCalled = 0; + private int getJobResult = 0; + + @Getter private StartJobRequest jobRequest; + + @Override + public String startJobRun(StartJobRequest startJobRequest) { + jobRequest = startJobRequest; + startJobRunCalled++; + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + getJobResult++; + JobRun jobRun = new JobRun(); + jobRun.setState("RUNNING"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + cancelJobRunCalled++; + return new CancelJobRunResult().withJobRunId(jobId); + } + + public void startJobRunCalled(int expectedTimes) { + assertEquals(expectedTimes, startJobRunCalled); + } + + public void cancelJobRunCalled(int expectedTimes) { + assertEquals(expectedTimes, cancelJobRunCalled); + } + + public void getJobRunResultCalled(int expectedTimes) { + assertEquals(expectedTimes, getJobResult); + } + } + + public SparkExecutionEngineConfig sparkExecutionEngineConfig() { + return new SparkExecutionEngineConfig("appId", "us-west-2", "roleArn", "", "myCluster"); + } + + public void enableSession(boolean enabled) { + // doNothing + } + + public void setSessionLimit(long limit) { + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey(), limit).build()) + .get(); + } + + public void setConcurrentRefreshJob(long limit) { + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING.getKey(), limit) + .build()) + .get(); + } + + int search(QueryBuilder query) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + SearchResponse searchResponse = client.search(searchRequest).actionGet(); + + return searchResponse.getHits().getHits().length; + } + + void setSessionState(String sessionId, SessionState sessionState) { + Optional model = getSession(stateStore, DATASOURCE).apply(sessionId); + SessionModel updated = + updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); + assertEquals(sessionState, updated.getSessionState()); + } + + @SneakyThrows + public String loadResultIndexMappings() { + URL url = Resources.getResource("query_execution_result_mapping.json"); + return Resources.toString(url, Charsets.UTF_8); + } + + public void createIndexWithMappings(String indexName, String metadata) { + CreateIndexRequest request = new CreateIndexRequest(indexName); + request.mapping(metadata, XContentType.JSON); + client().admin().indices().create(request).actionGet(); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java new file mode 100644 index 0000000000..45a83b3296 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -0,0 +1,793 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery; + +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; +import java.net.URL; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { + + public final FlintDatasetMock LEGACY_SKIPPING = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default.http_logs", + FlintIndexType.SKIPPING, + "flint_mys3_default_http_logs_skipping_index") + .isLegacy(true); + public final FlintDatasetMock LEGACY_COVERING = + new FlintDatasetMock( + "DROP INDEX covering ON mys3.default.http_logs", + FlintIndexType.COVERING, + "flint_mys3_default_http_logs_covering_index") + .isLegacy(true); + public final FlintDatasetMock LEGACY_MV = + new FlintDatasetMock( + "DROP MATERIALIZED VIEW mv", FlintIndexType.MATERIALIZED_VIEW, "flint_mv") + .isLegacy(true); + + public final FlintDatasetMock SKIPPING = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default.http_logs", + FlintIndexType.SKIPPING, + "flint_mys3_default_http_logs_skipping_index") + .latestId("skippingindexid"); + public final FlintDatasetMock COVERING = + new FlintDatasetMock( + "DROP INDEX covering ON mys3.default.http_logs", + FlintIndexType.COVERING, + "flint_mys3_default_http_logs_covering_index") + .latestId("coveringid"); + public final FlintDatasetMock MV = + new FlintDatasetMock( + "DROP MATERIALIZED VIEW mv", FlintIndexType.MATERIALIZED_VIEW, "flint_mv") + .latestId("mvid"); + + /** + * Happy case. expectation is + * + *

(1) Drop Index response is SUCCESS + */ + @Test + public void legacyBasicDropAndFetchAndCancel() { + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); + } + }; + + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + assertNotNull(response.getQueryId()); + assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); + + // 2.fetch result + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + emrsClient.cancelJobRunCalled(1); + + // 3.cancel + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); + assertEquals("can't cancel index DML query", exception.getMessage()); + }); + } + + /** + * Legacy Test, without state index support. Not EMR-S job running. expectation is + * + *

(1) Drop Index response is SUCCESS + */ + @Test + public void legacyDropIndexNoJobRunning() { + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + throw new IllegalArgumentException("Job run is not in a cancellable state"); + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2.fetch result. + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + }); + } + + /** + * Legacy Test, without state index support. Cancel EMR-S job call timeout. expectation is + * + *

(1) Drop Index response is FAILED + */ + @Test + public void legacyDropIndexCancelJobTimeout() { + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) + .forEach( + mockDS -> { + // Mock EMR-S always return running. + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return new GetJobRunResult().withJobRun(new JobRun().withState("Running")); + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryResults.getStatus()); + assertEquals("cancel job timeout", asyncQueryResults.getError()); + }); + } + + /** + * Happy case. expectation is + * + *

(1) Drop Index response is SUCCESS (2) change index state to: DELETED + */ + @Test + public void dropAndFetchAndCancel() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); + } + }; + + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + flintIndexJob.refreshing(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + assertNotNull(response.getQueryId()); + assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); + + // assert state is DELETED + flintIndexJob.assertState(FlintIndexState.DELETED); + + // 2.fetch result + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + emrsClient.cancelJobRunCalled(1); + + // 3.cancel + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); + assertEquals("can't cancel index DML query", exception.getMessage()); + }); + } + + /** + * Cancel EMR-S job, but not job running. expectation is + * + *

(1) Drop Index response is SUCCESS (2) change index state to: DELETED + */ + @Test + public void dropIndexNoJobRunning() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + // Mock EMR-S job is not running + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + throw new IllegalArgumentException("Job run is not in a cancellable state"); + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state in refresh state. + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + flintIndexJob.refreshing(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2.fetch result. + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + }); + } + + /** + * Cancel EMR-S job call timeout, expectation is + * + *

(1) Drop Index response is failed, (2) change index state to: CANCELLING + */ + @Test + public void dropIndexCancelJobTimeout() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + // Mock EMR-S always return running. + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return new GetJobRunResult().withJobRun(new JobRun().withState("Running")); + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + flintIndexJob.refreshing(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryResults.getStatus()); + assertEquals("cancel job timeout", asyncQueryResults.getError()); + + flintIndexJob.assertState(FlintIndexState.CANCELLING); + }); + } + + /** + * Drop Index operation is retryable, expectation is + * + *

(1) call EMR-S (2) change index state to: DELETED + */ + @Test + public void dropIndexWithIndexInCancellingState() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + flintIndexJob.cancelling(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + }); + } + + /** + * No Job running, expectation is + * + *

(1) not call EMR-S (2) change index state to: DELETED + */ + @Test + public void dropIndexWithIndexInActiveState() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + Assert.fail("should not call cancelJobRun"); + return null; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + Assert.fail("should not call getJobRunResult"); + return null; + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + flintIndexJob.active(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + }); + } + + @Test + public void dropIndexWithIndexInDeletingState() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + Assert.fail("should not call cancelJobRun"); + return null; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + Assert.fail("should not call getJobRunResult"); + return null; + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + flintIndexJob.deleted(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + }); + } + + @Test + public void dropIndexWithIndexInDeletedState() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + Assert.fail("should not call cancelJobRun"); + return null; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + Assert.fail("should not call getJobRunResult"); + return null; + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + flintIndexJob.deleting(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + }); + } + + /** + * No Job running, expectation is + * + *

(1) not call EMR-S (2) change index state to: DELETED + */ + @Test + public void dropIndexWithIndexInEmptyState() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + Assert.fail("should not call cancelJobRun"); + return null; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + Assert.fail("should not call getJobRunResult"); + return null; + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + }); + } + + /** + * No Job running, expectation is + * + *

(1) not call EMR-S (2) change index state to: DELETED + */ + @Test + public void edgeCaseNoIndexStateDoc() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + Assert.fail("should not call cancelJobRun"); + return null; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + Assert.fail("should not call getJobRunResult"); + return null; + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // Mock flint index + mockDS.createIndex(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("FAILED", asyncQueryResults.getStatus()); + assertTrue(asyncQueryResults.getError().contains("no state found")); + }); + } + + @Test + public void concurrentRefreshJobLimitNotApplied() { + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(new LocalEMRSClient()); + + // Mock flint index + COVERING.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + flintIndexJob.refreshing(); + + // query with auto refresh + String query = + "CREATE INDEX covering ON mys3.default.http_logs(l_orderkey, " + + "l_quantity) WITH (auto_refresh = true)"; + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + assertNull(response.getSessionId()); + } + + @Test + public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(new LocalEMRSClient()); + + setConcurrentRefreshJob(1); + + // Mock flint index + COVERING.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + flintIndexJob.refreshing(); + + // query with auto_refresh = true. + String query = + "CREATE INDEX covering ON mys3.default.http_logs(l_orderkey, " + + "l_quantity) WITH (auto_refresh = true)"; + ConcurrencyLimitExceededException exception = + assertThrows( + ConcurrencyLimitExceededException.class, + () -> + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); + assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); + } + + @Test + public void concurrentRefreshJobLimitAppliedToRefresh() { + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(new LocalEMRSClient()); + + setConcurrentRefreshJob(1); + + // Mock flint index + COVERING.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + flintIndexJob.refreshing(); + + // query with auto_refresh = true. + String query = "REFRESH INDEX covering ON mys3.default.http_logs"; + ConcurrencyLimitExceededException exception = + assertThrows( + ConcurrencyLimitExceededException.class, + () -> + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); + assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); + } + + @Test + public void concurrentRefreshJobLimitNotAppliedToDDL() { + String query = "CREATE INDEX covering ON mys3.default.http_logs(l_orderkey, l_quantity)"; + + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(new LocalEMRSClient()); + + setConcurrentRefreshJob(1); + + // Mock flint index + COVERING.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + flintIndexJob.refreshing(); + + CreateAsyncQueryResponse asyncQueryResponse = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + assertNotNull(asyncQueryResponse.getSessionId()); + } + + public class MockFlintSparkJob { + + private FlintIndexStateModel stateModel; + + public MockFlintSparkJob(String latestId) { + assertNotNull(latestId); + stateModel = + new FlintIndexStateModel( + FlintIndexState.EMPTY, + "mockAppId", + "mockJobId", + latestId, + DATASOURCE, + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); + } + + public void refreshing() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.REFRESHING); + } + + public void cancelling() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.CANCELLING); + } + + public void active() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.ACTIVE); + } + + public void deleting() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.DELETING); + } + + public void deleted() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.DELETED); + } + + void assertState(FlintIndexState expected) { + Optional stateModelOpt = + StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); + assertTrue((stateModelOpt.isPresent())); + assertEquals(expected, stateModelOpt.get().getIndexState()); + } + } + + @RequiredArgsConstructor + public class FlintDatasetMock { + private final String query; + private final FlintIndexType indexType; + private final String indexName; + private boolean isLegacy = false; + private String latestId; + + FlintDatasetMock isLegacy(boolean isLegacy) { + this.isLegacy = isLegacy; + return this; + } + + FlintDatasetMock latestId(String latestId) { + this.latestId = latestId; + return this; + } + + public void createIndex() { + String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; + switch (indexType) { + case SKIPPING: + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json")); + break; + case COVERING: + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json")); + break; + case MATERIALIZED_VIEW: + createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json")); + break; + } + } + + @SneakyThrows + public void deleteIndex() { + client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); + } + } + + @SneakyThrows + public static String loadMappings(String path) { + URL url = Resources.getResource(path); + return Resources.toString(url, Charsets.UTF_8); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java index de0caf5589..cf838db829 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/OpensearchAsyncQueryAsyncQueryJobMetadataStorageServiceTest.java @@ -46,7 +46,7 @@ public void testStoreJobMetadata() { assertTrue(actual.isPresent()); assertEquals(expected, actual.get()); - assertFalse(actual.get().isDropIndexQuery()); + assertEquals(expected, actual.get()); assertNull(actual.get().getSessionId()); } @@ -57,7 +57,6 @@ public void testStoreJobMetadataWithResultExtraData() { AsyncQueryId.newAsyncQueryId(DS_NAME), EMR_JOB_ID, EMRS_APPLICATION_ID, - true, MOCK_RESULT_INDEX, MOCK_SESSION_ID); @@ -67,7 +66,6 @@ public void testStoreJobMetadataWithResultExtraData() { assertTrue(actual.isPresent()); assertEquals(expected, actual.get()); - assertTrue(actual.get().isDropIndexQuery()); assertEquals("resultIndex", actual.get().getResultIndex()); assertEquals(MOCK_SESSION_ID, actual.get().getSessionId()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java deleted file mode 100644 index d1c26f52e0..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/DropIndexResultTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.dispatcher; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; - -import com.amazonaws.services.emrserverless.model.JobRunState; -import org.json.JSONObject; -import org.junit.jupiter.api.Test; - -public class DropIndexResultTest { - // todo, remove this UT after response refactor. - @Test - public void successRespEncodeDecode() { - // encode jobId - String jobId = - new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId(); - - // decode jobId - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); - - JSONObject result = dropIndexResult.result(); - assertEquals(JobRunState.SUCCESS.toString(), result.get(STATUS_FIELD)); - assertEquals( - "{\"result\":[],\"schema\":[],\"applicationId\":\"fakeDropIndexApplicationId\"}", - result.get(DATA_FIELD).toString()); - } - - // todo, remove this UT after response refactor. - @Test - public void failedRespEncodeDecode() { - // encode jobId - String jobId = - new SparkQueryDispatcher.DropIndexResult(JobRunState.FAILED.toString()).toJobId(); - - // decode jobId - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(jobId); - - JSONObject result = dropIndexResult.result(); - assertEquals(JobRunState.FAILED.toString(), result.get(STATUS_FIELD)); - assertEquals("failed to drop index", result.get(ERROR_FIELD)); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java new file mode 100644 index 0000000000..8419d50ae1 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class IndexDMLHandlerTest { + @Test + public void getResponseFromExecutor() { + assertThrows( + IllegalStateException.class, + () -> + new IndexDMLHandler(null, null, null, null, null, null, null) + .getResponseFromExecutor(null)); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 7663ece350..2a76eabe6a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -7,13 +7,11 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -33,7 +31,10 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; -import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.*; +import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.CLUSTER_NAME_TAG_KEY; +import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.DATASOURCE_TAG_KEY; +import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.INDEX_TAG_KEY; +import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; @@ -44,7 +45,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutionException; import org.json.JSONObject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -54,7 +54,6 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -64,16 +63,17 @@ import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; -import org.opensearch.sql.spark.dispatcher.model.*; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; +import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statement.Statement; import org.opensearch.sql.spark.execution.statement.StatementId; import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; -import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @@ -90,8 +90,6 @@ public class SparkQueryDispatcherTest { @Mock(answer = RETURNS_DEEP_STUBS) private Client openSearchClient; - @Mock private FlintIndexMetadata flintIndexMetadata; - @Mock private SessionManager sessionManager; @Mock private LeaseManager leaseManager; @@ -102,6 +100,8 @@ public class SparkQueryDispatcherTest { @Mock(answer = RETURNS_DEEP_STUBS) private Statement statement; + @Mock private StateStore stateStore; + private SparkQueryDispatcher sparkQueryDispatcher; private final AsyncQueryId QUERY_ID = AsyncQueryId.newAsyncQueryId(DS_NAME); @@ -119,7 +119,8 @@ void setUp() { flintIndexMetadataReader, openSearchClient, sessionManager, - leaseManager); + leaseManager, + stateStore); } @Test @@ -173,7 +174,6 @@ void testDispatchSelectQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -229,7 +229,6 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -283,7 +282,6 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -403,7 +401,6 @@ void testDispatchIndexQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -458,7 +455,6 @@ void testDispatchWithPPLQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -513,7 +509,6 @@ void testDispatchQueryWithoutATableAndDataSourceName() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -572,7 +567,6 @@ void testDispatchIndexQueryWithoutADatasourceName() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -631,7 +625,6 @@ void testDispatchMaterializedViewQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -686,7 +679,6 @@ void testDispatchShowMVQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -741,7 +733,6 @@ void testRefreshIndexQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -796,7 +787,6 @@ void testDispatchDescribeIndexQuery() { null); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); } @@ -1008,245 +998,6 @@ void testGetQueryResponseWithSuccess() { verifyNoInteractions(emrServerlessClient); } - // todo. refactor query process logic in plugin. - @Test - void testGetQueryResponseOfDropIndex() { - String jobId = - new SparkQueryDispatcher.DropIndexResult(JobRunState.SUCCESS.toString()).toJobId(); - - JSONObject result = - sparkQueryDispatcher.getQueryResponse( - new AsyncQueryJobMetadata( - AsyncQueryId.newAsyncQueryId(DS_NAME), - EMRS_APPLICATION_ID, - jobId, - true, - null, - null)); - verify(jobExecutionResponseReader, times(0)) - .getResultFromOpensearchIndex(anyString(), anyString()); - Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD)); - } - - @Test - void testDropIndexQuery() throws ExecutionException, InterruptedException { - String query = "DROP INDEX size_year ON my_glue.default.http_logs"; - IndexQueryDetails indexQueryDetails = - IndexQueryDetails.builder() - .indexName("size_year") - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.COVERING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) - .thenReturn(flintIndexMetadata); - when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); - // auto_refresh == true - when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); - - when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) - .thenReturn( - new CancelJobRunResult() - .withJobRunId(EMR_JOB_ID) - .withApplicationId(EMRS_APPLICATION_ID)); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - - AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); - when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); - when(acknowledgedResponse.isAcknowledged()).thenReturn(true); - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - new DispatchQueryRequest( - EMRS_APPLICATION_ID, - query, - "my_glue", - LangType.SQL, - EMRS_EXECUTION_ROLE, - TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); - verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); - Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); - Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); - } - - @Test - void testDropSkippingIndexQuery() throws ExecutionException, InterruptedException { - String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexQueryDetails indexQueryDetails = - IndexQueryDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.SKIPPING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) - .thenReturn(flintIndexMetadata); - when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); - when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); - - when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) - .thenReturn( - new CancelJobRunResult() - .withJobRunId(EMR_JOB_ID) - .withApplicationId(EMRS_APPLICATION_ID)); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); - when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - new DispatchQueryRequest( - EMRS_APPLICATION_ID, - query, - "my_glue", - LangType.SQL, - EMRS_EXECUTION_ROLE, - TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); - verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); - Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); - Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); - } - - @Test - void testDropSkippingIndexQueryAutoRefreshFalse() - throws ExecutionException, InterruptedException { - String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexQueryDetails indexQueryDetails = - IndexQueryDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.SKIPPING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) - .thenReturn(flintIndexMetadata); - when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); - - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); - when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - new DispatchQueryRequest( - EMRS_APPLICATION_ID, - query, - "my_glue", - LangType.SQL, - EMRS_EXECUTION_ROLE, - TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); - verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); - Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); - Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); - } - - @Test - void testDropSkippingIndexQueryDeleteIndexException() - throws ExecutionException, InterruptedException { - String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexQueryDetails indexQueryDetails = - IndexQueryDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.SKIPPING) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) - .thenReturn(flintIndexMetadata); - when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); - - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - - when(openSearchClient.admin().indices().delete(any()).get()) - .thenThrow(ExecutionException.class); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - new DispatchQueryRequest( - EMRS_APPLICATION_ID, - query, - "my_glue", - LangType.SQL, - EMRS_EXECUTION_ROLE, - TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); - verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); - Assertions.assertEquals(JobRunState.FAILED.toString(), dropIndexResult.getStatus()); - Assertions.assertEquals( - "{\"error\":\"failed to drop index\",\"status\":\"FAILED\"}", - dropIndexResult.result().toString()); - Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); - } - - @Test - void testDropMVQuery() throws ExecutionException, InterruptedException { - String query = "DROP MATERIALIZED VIEW mv_1"; - IndexQueryDetails indexQueryDetails = - IndexQueryDetails.builder() - .mvName("mv_1") - .indexQueryActionType(IndexQueryActionType.DROP) - .fullyQualifiedTableName(null) - .indexType(FlintIndexType.MATERIALIZED_VIEW) - .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) - .thenReturn(flintIndexMetadata); - when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); - // auto_refresh == true - when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); - - when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID)) - .thenReturn( - new CancelJobRunResult() - .withJobRunId(EMR_JOB_ID) - .withApplicationId(EMRS_APPLICATION_ID)); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - - AcknowledgedResponse acknowledgedResponse = mock(AcknowledgedResponse.class); - when(openSearchClient.admin().indices().delete(any()).get()).thenReturn(acknowledgedResponse); - when(acknowledgedResponse.isAcknowledged()).thenReturn(true); - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - new DispatchQueryRequest( - EMRS_APPLICATION_ID, - query, - "my_glue", - LangType.SQL, - EMRS_EXECUTION_ROLE, - TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); - verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); - SparkQueryDispatcher.DropIndexResult dropIndexResult = - SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); - Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); - Assertions.assertTrue(dispatchQueryResponse.isDropIndexQuery()); - } - @Test void testDispatchQueryWithExtraSparkSubmitParameters() { DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); @@ -1434,6 +1185,6 @@ private AsyncQueryJobMetadata asyncQueryJobMetadata() { private AsyncQueryJobMetadata asyncQueryJobMetadataWithSessionId( String statementId, String sessionId) { return new AsyncQueryJobMetadata( - new AsyncQueryId(statementId), EMRS_APPLICATION_ID, EMR_JOB_ID, false, null, sessionId); + new AsyncQueryId(statementId), EMRS_APPLICATION_ID, EMR_JOB_ID, null, sessionId); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index 3546a874d9..5a0a53009f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -26,13 +26,10 @@ public class SessionManagerTest { public void sessionEnable() { Assertions.assertTrue( new SessionManager(stateStore, emrClient, sessionSetting(true)).isEnabled()); - Assertions.assertFalse( - new SessionManager(stateStore, emrClient, sessionSetting(false)).isEnabled()); } public static Settings sessionSetting(boolean enabled) { Map settings = new HashMap<>(); - settings.put(Settings.Key.SPARK_EXECUTION_SESSION_ENABLED, enabled); settings.put(Settings.Key.SPARK_EXECUTION_SESSION_LIMIT, 100); return settings(settings); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java new file mode 100644 index 0000000000..acd76fa11a --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexStateTest.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.junit.jupiter.api.Assertions.*; +import static org.opensearch.sql.spark.flint.FlintIndexState.UNKNOWN; + +import org.junit.jupiter.api.Test; + +class FlintIndexStateTest { + @Test + public void unknownState() { + assertEquals(UNKNOWN, FlintIndexState.fromString("noSupported")); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java new file mode 100644 index 0000000000..5b3c1d74db --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec.DATASOURCE; + +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +@ExtendWith(MockitoExtension.class) +class FlintIndexOpTest { + @Mock private StateStore stateStore; + + @Mock private FlintIndexMetadata flintIndexMetadata; + + @Mock private FlintIndexStateModel model; + + @Test + public void beginFailed() { + when(stateStore.updateState(any(), any(), any(), any())).thenThrow(RuntimeException.class); + when(stateStore.get(any(), any(), any())).thenReturn(Optional.of(model)); + when(model.getIndexState()).thenReturn(FlintIndexState.ACTIVE); + when(flintIndexMetadata.getLatestId()).thenReturn(Optional.of("latestId")); + + FlintIndexOpDelete indexOp = new FlintIndexOpDelete(stateStore, DATASOURCE); + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> indexOp.apply(flintIndexMetadata)); + Assertions.assertEquals( + "begin failed. target transitioning state: [DELETING]", exception.getMessage()); + } + + @Test + public void commitFailed() { + when(stateStore.updateState(any(), any(), any(), any())) + .thenReturn(model) + .thenThrow(RuntimeException.class); + when(stateStore.get(any(), any(), any())).thenReturn(Optional.of(model)); + when(model.getIndexState()).thenReturn(FlintIndexState.EMPTY); + when(flintIndexMetadata.getLatestId()).thenReturn(Optional.of("latestId")); + + FlintIndexOpDelete indexOp = new FlintIndexOpDelete(stateStore, DATASOURCE); + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> indexOp.apply(flintIndexMetadata)); + Assertions.assertEquals( + "commit failed. target stable state: [DELETED]", exception.getMessage()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java index 47111c3a38..558f7f7b3a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java @@ -5,6 +5,8 @@ package org.opensearch.sql.spark.leasemanager; +import static org.junit.jupiter.api.Assertions.assertTrue; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -22,6 +24,18 @@ class DefaultLeaseManagerTest { @Test public void concurrentSessionRuleOnlyApplyToInteractiveQuery() { - new DefaultLeaseManager(settings, stateStore).borrow(new LeaseRequest(JobType.BATCH, "mys3")); + assertTrue( + new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore) + .test(new LeaseRequest(JobType.BATCH, "mys3"))); + assertTrue( + new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore) + .test(new LeaseRequest(JobType.STREAMING, "mys3"))); + } + + @Test + public void concurrentRefreshRuleOnlyNotAppliedToInteractiveQuery() { + assertTrue( + new DefaultLeaseManager.ConcurrentRefreshJobRule(settings, stateStore) + .test(new LeaseRequest(JobType.INTERACTIVE, "mys3"))); } } diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json new file mode 100644 index 0000000000..54ed5e05e1 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json @@ -0,0 +1,37 @@ +{ + "_meta": { + "kind": "covering", + "indexedColumns": [ + { + "columnType": "timestamp", + "columnName": "time" + }, + { + "columnType": "string", + "columnName": "client_ip" + }, + { + "columnType": "int", + "columnName": "client_port" + }, + { + "columnType": "string", + "columnName": "request_url" + } + ], + "name": "test", + "options": { + "auto_refresh": "true", + "index_settings": "{\"number_of_shards\":1,\"number_of_replicas\":1}" + }, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fe3gu2tgad000q" + } + }, + "latestId": "coveringid" + } +} diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json new file mode 100644 index 0000000000..1a9c74806a --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json @@ -0,0 +1,30 @@ +{ + "_meta": { + "kind": "mv", + "indexedColumns": [ + { + "columnType": "timestamp", + "columnName": "start.time" + }, + { + "columnType": "long", + "columnName": "count" + } + ], + "name": "spark_catalog.default.http_logs_metrics_chen", + "options": { + "auto_refresh": "true", + "checkpoint_location": "s3://flint-data-dp-eu-west-1-beta/data/checkpoint/chen-job-1", + "watermark_delay": "30 Minutes" + }, + "source": "SELECT window.start AS `start.time`, COUNT(*) AS count FROM mys3.default.http_logs WHERE status != 200 GROUP BY TUMBLE(`@timestamp`, '6 Hours')", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fe86mkk5q3u00q" + } + }, + "latestId": "mvid" + } +} diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json new file mode 100644 index 0000000000..5e7c9175fd --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json @@ -0,0 +1,23 @@ +{ + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_skipping_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + }, + "latestId": "skippingindexid" + } +} diff --git a/spark/src/test/resources/flint-index-mappings/flint_covering_index.json b/spark/src/test/resources/flint-index-mappings/flint_covering_index.json new file mode 100644 index 0000000000..f68a1627ab --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_covering_index.json @@ -0,0 +1,36 @@ +{ + "_meta": { + "kind": "covering", + "indexedColumns": [ + { + "columnType": "timestamp", + "columnName": "time" + }, + { + "columnType": "string", + "columnName": "client_ip" + }, + { + "columnType": "int", + "columnName": "client_port" + }, + { + "columnType": "string", + "columnName": "request_url" + } + ], + "name": "test", + "options": { + "auto_refresh": "true", + "index_settings": "{\"number_of_shards\":1,\"number_of_replicas\":1}" + }, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fe3gu2tgad000q" + } + } + } +} diff --git a/spark/src/test/resources/flint-index-mappings/flint_mv.json b/spark/src/test/resources/flint-index-mappings/flint_mv.json new file mode 100644 index 0000000000..3d130832b8 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_mv.json @@ -0,0 +1,42 @@ +{ + "_meta": { + "kind": "mv", + "indexedColumns": [ + { + "columnType": "timestamp", + "columnName": "start.time" + }, + { + "columnType": "long", + "columnName": "count" + } + ], + "name": "spark_catalog.default.http_logs_metrics_chen", + "options": { + "auto_refresh": "true", + "checkpoint_location": "s3://flint-data-dp-eu-west-1-beta/data/checkpoint/chen-job-1", + "watermark_delay": "30 Minutes" + }, + "source": "SELECT window.start AS `start.time`, COUNT(*) AS count FROM mys3.default.http_logs WHERE status != 200 GROUP BY TUMBLE(`@timestamp`, '6 Hours')", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fe86mkk5q3u00q" + } + } + }, + "properties": { + "count": { + "type": "long" + }, + "start": { + "properties": { + "time": { + "type": "date", + "format": "strict_date_optional_time_nanos" + } + } + } + } +} diff --git a/spark/src/test/resources/flint-index-mappings/flint_skipping_index.json b/spark/src/test/resources/flint-index-mappings/flint_skipping_index.json new file mode 100644 index 0000000000..e4bf849f20 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_skipping_index.json @@ -0,0 +1,22 @@ +{ + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_skipping_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + } + } +} diff --git a/spark/src/test/resources/query_execution_result_mapping.json b/spark/src/test/resources/query_execution_result_mapping.json new file mode 100644 index 0000000000..a76ef77383 --- /dev/null +++ b/spark/src/test/resources/query_execution_result_mapping.json @@ -0,0 +1,44 @@ +{ + "dynamic": "false", + "properties": { + "applicationId": { + "type": "keyword" + }, + "dataSourceName": { + "type": "keyword" + }, + "error": { + "type": "text" + }, + "jobRunId": { + "type": "keyword" + }, + "queryId": { + "type": "keyword" + }, + "queryRunTime": { + "type": "long" + }, + "queryText": { + "type": "text" + }, + "result": { + "type": "object", + "enabled": false + }, + "schema": { + "type": "object", + "enabled": false + }, + "sessionId": { + "type": "keyword" + }, + "status": { + "type": "keyword" + }, + "updateTime": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } +}