diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 28c0bb7f4e..2a9231fc25 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -45,7 +45,10 @@ public enum Key { AUTO_INDEX_MANAGEMENT_ENABLED( "plugins.query.executionengine.spark.auto_index_management.enabled"), SESSION_INACTIVITY_TIMEOUT_MILLIS( - "plugins.query.executionengine.spark.session_inactivity_timeout_millis"); + "plugins.query.executionengine.spark.session_inactivity_timeout_millis"), + + /** Async query Settings * */ + ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"); @Getter private final String keyValue; diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 04b10935de..c1a7a4eb8b 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -562,4 +562,36 @@ SQL query:: } } } - } \ No newline at end of file + } + +plugins.query.executionengine.async_query.enabled +=============================== + +Description +----------- +You can disable submit async query to reject all coming requests. + +1. The default value is true. +2. This setting is node scope. +3. This setting can be updated dynamically. + +Request:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.async_query.enabled":"false"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "async_query": { + "enabled": "false" + } + } + } + } + } + } + diff --git a/integ-test/src/test/java/org/opensearch/sql/asyncquery/AsyncQueryIT.java b/integ-test/src/test/java/org/opensearch/sql/asyncquery/AsyncQueryIT.java new file mode 100644 index 0000000000..9b5cc96b0e --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/asyncquery/AsyncQueryIT.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.asyncquery; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; + +import java.io.IOException; +import java.util.Locale; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.ppl.PPLIntegTestCase; +import org.opensearch.sql.util.TestUtils; + +public class AsyncQueryIT extends PPLIntegTestCase { + + public static final String ASYNC_QUERY_ACTION_URL = "/_plugins/_async_query"; + + @Test + public void asyncQueryEnabledSettingsTest() throws IOException { + String setting = "plugins.query.executionengine.async_query.enabled"; + // disable + updateClusterSettings(new ClusterSetting(PERSISTENT, setting, "false")); + + String query = "select 1"; + Response response = null; + try { + executeAsyncQueryToString(query); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + + JSONObject result = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(result.getInt("status"), equalTo(400)); + JSONObject error = result.getJSONObject("error"); + assertThat(error.getString("reason"), equalTo("Invalid Request")); + assertThat( + error.getString("details"), + equalTo("plugins.query.executionengine.async_query.enabled setting is false")); + assertThat(error.getString("type"), equalTo("IllegalAccessException")); + + // reset the setting + updateClusterSettings(new ClusterSetting(PERSISTENT, setting, null)); + } + + protected String executeAsyncQueryToString(String query) throws IOException { + Response response = client().performRequest(buildAsyncRequest(query, ASYNC_QUERY_ACTION_URL)); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + return getResponseBody(response, true); + } + + protected Request buildAsyncRequest(String query, String endpoint) { + Request request = new Request("POST", endpoint); + request.setJsonEntity(String.format(Locale.ROOT, "{\n" + " \"query\": \"%s\"\n" + "}", query)); + request.setJsonEntity( + String.format( + Locale.ROOT, + "{\n" + + " \"datasource\": \"mys3\",\n" + + " \"lang\": \"sql\",\n" + + " \"query\": \"%s\"\n" + + "}", + query)); + + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return request; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index cbb0d232a7..159b37309e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -131,6 +131,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting ASYNC_QUERY_ENABLED_SETTING = + Setting.boolSetting( + Key.ASYNC_QUERY_ENABLED.getKeyValue(), + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting SPARK_EXECUTION_ENGINE_CONFIG = Setting.simpleString( Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(), @@ -250,6 +257,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.DATASOURCES_URI_HOSTS_DENY_LIST, DATASOURCE_URI_HOSTS_DENY_LIST, new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + register( + settingBuilder, + clusterSettings, + Key.ASYNC_QUERY_ENABLED, + ASYNC_QUERY_ENABLED_SETTING, + new Updater(Key.ASYNC_QUERY_ENABLED)); register( settingBuilder, clusterSettings, @@ -362,6 +375,7 @@ public static List> pluginSettings() { .add(METRICS_ROLLING_WINDOW_SETTING) .add(METRICS_ROLLING_INTERVAL_SETTING) .add(DATASOURCE_URI_HOSTS_DENY_LIST) + .add(ASYNC_QUERY_ENABLED_SETTING) .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) .add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING) diff --git a/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java b/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java index 90d5d73696..00a455d943 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java +++ b/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java @@ -236,7 +236,8 @@ private static boolean isClientError(Exception e) { return e instanceof IllegalArgumentException || e instanceof IllegalStateException || e instanceof DataSourceNotFoundException - || e instanceof AsyncQueryNotFoundException; + || e instanceof AsyncQueryNotFoundException + || e instanceof IllegalAccessException; } private void addSystemErrorMetric(RestRequest.Method requestMethod) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestAction.java b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestAction.java index 991eafdad9..4e2102deed 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestAction.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestAction.java @@ -7,11 +7,14 @@ package org.opensearch.sql.spark.transport; +import java.util.Locale; import org.opensearch.action.ActionType; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl; @@ -26,6 +29,7 @@ public class TransportCreateAsyncQueryRequestAction extends HandledTransportAction { private final AsyncQueryExecutorService asyncQueryExecutorService; + private final OpenSearchSettings pluginSettings; public static final String NAME = "cluster:admin/opensearch/ql/async_query/create"; public static final ActionType ACTION_TYPE = @@ -35,9 +39,11 @@ public class TransportCreateAsyncQueryRequestAction public TransportCreateAsyncQueryRequestAction( TransportService transportService, ActionFilters actionFilters, - AsyncQueryExecutorServiceImpl jobManagementService) { + AsyncQueryExecutorServiceImpl jobManagementService, + OpenSearchSettings pluginSettings) { super(NAME, transportService, actionFilters, CreateAsyncQueryActionRequest::new); this.asyncQueryExecutorService = jobManagementService; + this.pluginSettings = pluginSettings; } @Override @@ -46,6 +52,16 @@ protected void doExecute( CreateAsyncQueryActionRequest request, ActionListener listener) { try { + if (!(Boolean) pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)) { + listener.onFailure( + new IllegalAccessException( + String.format( + Locale.ROOT, + "%s setting is " + "false", + Settings.Key.ASYNC_QUERY_ENABLED.getKeyValue()))); + return; + } + CreateAsyncQueryRequest createAsyncQueryRequest = request.getCreateAsyncQueryRequest(); CreateAsyncQueryResponse createAsyncQueryResponse = asyncQueryExecutorService.createAsyncQuery(createAsyncQueryRequest); diff --git a/spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestActionTest.java b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestActionTest.java index 36060d3850..190f62135b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestActionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateAsyncQueryRequestActionTest.java @@ -8,6 +8,7 @@ package org.opensearch.sql.spark.transport; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -25,6 +26,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.ActionFilters; import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; @@ -42,6 +45,7 @@ public class TransportCreateAsyncQueryRequestActionTest { @Mock private AsyncQueryExecutorServiceImpl jobExecutorService; @Mock private Task task; @Mock private ActionListener actionListener; + @Mock private OpenSearchSettings pluginSettings; @Captor private ArgumentCaptor createJobActionResponseArgumentCaptor; @@ -52,7 +56,10 @@ public class TransportCreateAsyncQueryRequestActionTest { public void setUp() { action = new TransportCreateAsyncQueryRequestAction( - transportService, new ActionFilters(new HashSet<>()), jobExecutorService); + transportService, + new ActionFilters(new HashSet<>()), + jobExecutorService, + pluginSettings); } @Test @@ -61,6 +68,7 @@ public void testDoExecute() { new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL); CreateAsyncQueryActionRequest request = new CreateAsyncQueryActionRequest(createAsyncQueryRequest); + when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true); when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest)) .thenReturn(new CreateAsyncQueryResponse("123", null)); action.doExecute(task, request, actionListener); @@ -78,6 +86,7 @@ public void testDoExecuteWithSessionId() { "source = my_glue.default.alb_logs", "my_glue", LangType.SQL, MOCK_SESSION_ID); CreateAsyncQueryActionRequest request = new CreateAsyncQueryActionRequest(createAsyncQueryRequest); + when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true); when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest)) .thenReturn(new CreateAsyncQueryResponse("123", MOCK_SESSION_ID)); action.doExecute(task, request, actionListener); @@ -95,6 +104,7 @@ public void testDoExecuteWithException() { new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL); CreateAsyncQueryActionRequest request = new CreateAsyncQueryActionRequest(createAsyncQueryRequest); + when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true); doThrow(new RuntimeException("Error")) .when(jobExecutorService) .createAsyncQuery(createAsyncQueryRequest); @@ -105,4 +115,21 @@ public void testDoExecuteWithException() { Assertions.assertTrue(exception instanceof RuntimeException); Assertions.assertEquals("Error", exception.getMessage()); } + + @Test + public void asyncQueryDisabled() { + CreateAsyncQueryRequest createAsyncQueryRequest = + new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL); + CreateAsyncQueryActionRequest request = + new CreateAsyncQueryActionRequest(createAsyncQueryRequest); + when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(false); + action.doExecute(task, request, actionListener); + verify(jobExecutorService, never()).createAsyncQuery(createAsyncQueryRequest); + Mockito.verify(actionListener).onFailure(exceptionArgumentCaptor.capture()); + Exception exception = exceptionArgumentCaptor.getValue(); + Assertions.assertTrue(exception instanceof IllegalAccessException); + Assertions.assertEquals( + "plugins.query.executionengine.async_query.enabled " + "setting is false", + exception.getMessage()); + } }