Skip to content

Commit

Permalink
Add setting plugins.query.executionengine.async_query.enabled (#2510)
Browse files Browse the repository at this point in the history
* Add setting plugins.query.executionengine.async_query.enabled

Signed-off-by: Peng Huo <[email protected]>

* fix format

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Feb 6, 2024
1 parent 2a3ebea commit cddffc6
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public enum Key {
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled"),
SESSION_INACTIVITY_TIMEOUT_MILLIS(
"plugins.query.executionengine.spark.session_inactivity_timeout_millis");
"plugins.query.executionengine.spark.session_inactivity_timeout_millis"),

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled");

@Getter private final String keyValue;

Expand Down
34 changes: 33 additions & 1 deletion docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -562,4 +562,36 @@ SQL query::
}
}
}
}
}

plugins.query.executionengine.async_query.enabled
===============================

Description
-----------
You can disable submit async query to reject all coming requests.

1. The default value is true.
2. This setting is node scope.
3. This setting can be updated dynamically.

Request::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.async_query.enabled":"false"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"async_query": {
"enabled": "false"
}
}
}
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.asyncquery;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;

import java.io.IOException;
import java.util.Locale;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.ppl.PPLIntegTestCase;
import org.opensearch.sql.util.TestUtils;

public class AsyncQueryIT extends PPLIntegTestCase {

public static final String ASYNC_QUERY_ACTION_URL = "/_plugins/_async_query";

@Test
public void asyncQueryEnabledSettingsTest() throws IOException {
String setting = "plugins.query.executionengine.async_query.enabled";
// disable
updateClusterSettings(new ClusterSetting(PERSISTENT, setting, "false"));

String query = "select 1";
Response response = null;
try {
executeAsyncQueryToString(query);
} catch (ResponseException ex) {
response = ex.getResponse();
}

JSONObject result = new JSONObject(TestUtils.getResponseBody(response));
assertThat(result.getInt("status"), equalTo(400));
JSONObject error = result.getJSONObject("error");
assertThat(error.getString("reason"), equalTo("Invalid Request"));
assertThat(
error.getString("details"),
equalTo("plugins.query.executionengine.async_query.enabled setting is false"));
assertThat(error.getString("type"), equalTo("IllegalAccessException"));

// reset the setting
updateClusterSettings(new ClusterSetting(PERSISTENT, setting, null));
}

protected String executeAsyncQueryToString(String query) throws IOException {
Response response = client().performRequest(buildAsyncRequest(query, ASYNC_QUERY_ACTION_URL));
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
return getResponseBody(response, true);
}

protected Request buildAsyncRequest(String query, String endpoint) {
Request request = new Request("POST", endpoint);
request.setJsonEntity(String.format(Locale.ROOT, "{\n" + " \"query\": \"%s\"\n" + "}", query));
request.setJsonEntity(
String.format(
Locale.ROOT,
"{\n"
+ " \"datasource\": \"mys3\",\n"
+ " \"lang\": \"sql\",\n"
+ " \"query\": \"%s\"\n"
+ "}",
query));

RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
restOptionsBuilder.addHeader("Content-Type", "application/json");
request.setOptions(restOptionsBuilder);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> ASYNC_QUERY_ENABLED_SETTING =
Setting.boolSetting(
Key.ASYNC_QUERY_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Expand Down Expand Up @@ -250,6 +257,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.ASYNC_QUERY_ENABLED,
ASYNC_QUERY_ENABLED_SETTING,
new Updater(Key.ASYNC_QUERY_ENABLED));
register(
settingBuilder,
clusterSettings,
Expand Down Expand Up @@ -362,6 +375,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(ASYNC_QUERY_ENABLED_SETTING)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ private static boolean isClientError(Exception e) {
return e instanceof IllegalArgumentException
|| e instanceof IllegalStateException
|| e instanceof DataSourceNotFoundException
|| e instanceof AsyncQueryNotFoundException;
|| e instanceof AsyncQueryNotFoundException
|| e instanceof IllegalAccessException;
}

private void addSystemErrorMetric(RestRequest.Method requestMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

package org.opensearch.sql.spark.transport;

import java.util.Locale;
import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
Expand All @@ -26,6 +29,7 @@ public class TransportCreateAsyncQueryRequestAction
extends HandledTransportAction<CreateAsyncQueryActionRequest, CreateAsyncQueryActionResponse> {

private final AsyncQueryExecutorService asyncQueryExecutorService;
private final OpenSearchSettings pluginSettings;

public static final String NAME = "cluster:admin/opensearch/ql/async_query/create";
public static final ActionType<CreateAsyncQueryActionResponse> ACTION_TYPE =
Expand All @@ -35,9 +39,11 @@ public class TransportCreateAsyncQueryRequestAction
public TransportCreateAsyncQueryRequestAction(
TransportService transportService,
ActionFilters actionFilters,
AsyncQueryExecutorServiceImpl jobManagementService) {
AsyncQueryExecutorServiceImpl jobManagementService,
OpenSearchSettings pluginSettings) {
super(NAME, transportService, actionFilters, CreateAsyncQueryActionRequest::new);
this.asyncQueryExecutorService = jobManagementService;
this.pluginSettings = pluginSettings;
}

@Override
Expand All @@ -46,6 +52,16 @@ protected void doExecute(
CreateAsyncQueryActionRequest request,
ActionListener<CreateAsyncQueryActionResponse> listener) {
try {
if (!(Boolean) pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)) {
listener.onFailure(
new IllegalAccessException(
String.format(
Locale.ROOT,
"%s setting is " + "false",
Settings.Key.ASYNC_QUERY_ENABLED.getKeyValue())));
return;
}

CreateAsyncQueryRequest createAsyncQueryRequest = request.getCreateAsyncQueryRequest();
CreateAsyncQueryResponse createAsyncQueryResponse =
asyncQueryExecutorService.createAsyncQuery(createAsyncQueryRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.sql.spark.transport;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -25,6 +26,8 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
Expand All @@ -42,6 +45,7 @@ public class TransportCreateAsyncQueryRequestActionTest {
@Mock private AsyncQueryExecutorServiceImpl jobExecutorService;
@Mock private Task task;
@Mock private ActionListener<CreateAsyncQueryActionResponse> actionListener;
@Mock private OpenSearchSettings pluginSettings;

@Captor
private ArgumentCaptor<CreateAsyncQueryActionResponse> createJobActionResponseArgumentCaptor;
Expand All @@ -52,7 +56,10 @@ public class TransportCreateAsyncQueryRequestActionTest {
public void setUp() {
action =
new TransportCreateAsyncQueryRequestAction(
transportService, new ActionFilters(new HashSet<>()), jobExecutorService);
transportService,
new ActionFilters(new HashSet<>()),
jobExecutorService,
pluginSettings);
}

@Test
Expand All @@ -61,6 +68,7 @@ public void testDoExecute() {
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest))
.thenReturn(new CreateAsyncQueryResponse("123", null));
action.doExecute(task, request, actionListener);
Expand All @@ -78,6 +86,7 @@ public void testDoExecuteWithSessionId() {
"source = my_glue.default.alb_logs", "my_glue", LangType.SQL, MOCK_SESSION_ID);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest))
.thenReturn(new CreateAsyncQueryResponse("123", MOCK_SESSION_ID));
action.doExecute(task, request, actionListener);
Expand All @@ -95,6 +104,7 @@ public void testDoExecuteWithException() {
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
doThrow(new RuntimeException("Error"))
.when(jobExecutorService)
.createAsyncQuery(createAsyncQueryRequest);
Expand All @@ -105,4 +115,21 @@ public void testDoExecuteWithException() {
Assertions.assertTrue(exception instanceof RuntimeException);
Assertions.assertEquals("Error", exception.getMessage());
}

@Test
public void asyncQueryDisabled() {
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(false);
action.doExecute(task, request, actionListener);
verify(jobExecutorService, never()).createAsyncQuery(createAsyncQueryRequest);
Mockito.verify(actionListener).onFailure(exceptionArgumentCaptor.capture());
Exception exception = exceptionArgumentCaptor.getValue();
Assertions.assertTrue(exception instanceof IllegalAccessException);
Assertions.assertEquals(
"plugins.query.executionengine.async_query.enabled " + "setting is false",
exception.getMessage());
}
}

0 comments on commit cddffc6

Please sign in to comment.