Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting plugins.query.executionengine.async_query.enabled #2510

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public enum Key {
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled"),
SESSION_INACTIVITY_TIMEOUT_MILLIS(
"plugins.query.executionengine.spark.session_inactivity_timeout_millis");
"plugins.query.executionengine.spark.session_inactivity_timeout_millis"),

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled");

@Getter private final String keyValue;

Expand Down
34 changes: 33 additions & 1 deletion docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -562,4 +562,36 @@ SQL query::
}
}
}
}
}

plugins.query.executionengine.async_query.enabled
===============================

Description
-----------
You can disable submit async query to reject all coming requests.

1. The default value is true.
2. This setting is node scope.
3. This setting can be updated dynamically.

Request::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.async_query.enabled":"false"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"async_query": {
"enabled": "false"
}
}
}
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.asyncquery;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;

import java.io.IOException;
import java.util.Locale;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.ppl.PPLIntegTestCase;
import org.opensearch.sql.util.TestUtils;

public class AsyncQueryIT extends PPLIntegTestCase {

public static final String ASYNC_QUERY_ACTION_URL = "/_plugins/_async_query";

@Test
public void asyncQueryEnabledSettingsTest() throws IOException {
String setting = "plugins.query.executionengine.async_query.enabled";
// disable
updateClusterSettings(new ClusterSetting(PERSISTENT, setting, "false"));

String query = "select 1";
Response response = null;
try {
executeAsyncQueryToString(query);
} catch (ResponseException ex) {
response = ex.getResponse();
}

JSONObject result = new JSONObject(TestUtils.getResponseBody(response));
assertThat(result.getInt("status"), equalTo(400));
JSONObject error = result.getJSONObject("error");
assertThat(error.getString("reason"), equalTo("Invalid Request"));
assertThat(
error.getString("details"),
equalTo("plugins.query.executionengine.async_query.enabled setting is false"));
assertThat(error.getString("type"), equalTo("IllegalAccessException"));

// reset the setting
updateClusterSettings(new ClusterSetting(PERSISTENT, setting, null));
}

protected String executeAsyncQueryToString(String query) throws IOException {
Response response = client().performRequest(buildAsyncRequest(query, ASYNC_QUERY_ACTION_URL));
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
return getResponseBody(response, true);
}

protected Request buildAsyncRequest(String query, String endpoint) {
Request request = new Request("POST", endpoint);
request.setJsonEntity(String.format(Locale.ROOT, "{\n" + " \"query\": \"%s\"\n" + "}", query));
request.setJsonEntity(
String.format(
Locale.ROOT,
"{\n"
+ " \"datasource\": \"mys3\",\n"
+ " \"lang\": \"sql\",\n"
+ " \"query\": \"%s\"\n"
+ "}",
query));

RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
restOptionsBuilder.addHeader("Content-Type", "application/json");
request.setOptions(restOptionsBuilder);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> ASYNC_QUERY_ENABLED_SETTING =
Setting.boolSetting(
Key.ASYNC_QUERY_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Expand Down Expand Up @@ -250,6 +257,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.ASYNC_QUERY_ENABLED,
ASYNC_QUERY_ENABLED_SETTING,
new Updater(Key.ASYNC_QUERY_ENABLED));
register(
settingBuilder,
clusterSettings,
Expand Down Expand Up @@ -362,6 +375,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(ASYNC_QUERY_ENABLED_SETTING)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ private static boolean isClientError(Exception e) {
return e instanceof IllegalArgumentException
|| e instanceof IllegalStateException
|| e instanceof DataSourceNotFoundException
|| e instanceof AsyncQueryNotFoundException;
|| e instanceof AsyncQueryNotFoundException
|| e instanceof IllegalAccessException;
}

private void addSystemErrorMetric(RestRequest.Method requestMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

package org.opensearch.sql.spark.transport;

import java.util.Locale;
import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
Expand All @@ -26,6 +29,7 @@ public class TransportCreateAsyncQueryRequestAction
extends HandledTransportAction<CreateAsyncQueryActionRequest, CreateAsyncQueryActionResponse> {

private final AsyncQueryExecutorService asyncQueryExecutorService;
private final OpenSearchSettings pluginSettings;

public static final String NAME = "cluster:admin/opensearch/ql/async_query/create";
public static final ActionType<CreateAsyncQueryActionResponse> ACTION_TYPE =
Expand All @@ -35,9 +39,11 @@ public class TransportCreateAsyncQueryRequestAction
public TransportCreateAsyncQueryRequestAction(
TransportService transportService,
ActionFilters actionFilters,
AsyncQueryExecutorServiceImpl jobManagementService) {
AsyncQueryExecutorServiceImpl jobManagementService,
OpenSearchSettings pluginSettings) {
super(NAME, transportService, actionFilters, CreateAsyncQueryActionRequest::new);
this.asyncQueryExecutorService = jobManagementService;
this.pluginSettings = pluginSettings;
}

@Override
Expand All @@ -46,6 +52,16 @@ protected void doExecute(
CreateAsyncQueryActionRequest request,
ActionListener<CreateAsyncQueryActionResponse> listener) {
try {
if (!(Boolean) pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)) {
listener.onFailure(
new IllegalAccessException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IllegalAccessException seems a Java reflection related exception?

String.format(
Locale.ROOT,
"%s setting is " + "false",
Settings.Key.ASYNC_QUERY_ENABLED.getKeyValue())));
return;
}

CreateAsyncQueryRequest createAsyncQueryRequest = request.getCreateAsyncQueryRequest();
CreateAsyncQueryResponse createAsyncQueryResponse =
asyncQueryExecutorService.createAsyncQuery(createAsyncQueryRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.sql.spark.transport;

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -25,6 +26,8 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
Expand All @@ -42,6 +45,7 @@ public class TransportCreateAsyncQueryRequestActionTest {
@Mock private AsyncQueryExecutorServiceImpl jobExecutorService;
@Mock private Task task;
@Mock private ActionListener<CreateAsyncQueryActionResponse> actionListener;
@Mock private OpenSearchSettings pluginSettings;

@Captor
private ArgumentCaptor<CreateAsyncQueryActionResponse> createJobActionResponseArgumentCaptor;
Expand All @@ -52,7 +56,10 @@ public class TransportCreateAsyncQueryRequestActionTest {
public void setUp() {
action =
new TransportCreateAsyncQueryRequestAction(
transportService, new ActionFilters(new HashSet<>()), jobExecutorService);
transportService,
new ActionFilters(new HashSet<>()),
jobExecutorService,
pluginSettings);
}

@Test
Expand All @@ -61,6 +68,7 @@ public void testDoExecute() {
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest))
.thenReturn(new CreateAsyncQueryResponse("123", null));
action.doExecute(task, request, actionListener);
Expand All @@ -78,6 +86,7 @@ public void testDoExecuteWithSessionId() {
"source = my_glue.default.alb_logs", "my_glue", LangType.SQL, MOCK_SESSION_ID);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
when(jobExecutorService.createAsyncQuery(createAsyncQueryRequest))
.thenReturn(new CreateAsyncQueryResponse("123", MOCK_SESSION_ID));
action.doExecute(task, request, actionListener);
Expand All @@ -95,6 +104,7 @@ public void testDoExecuteWithException() {
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(true);
doThrow(new RuntimeException("Error"))
.when(jobExecutorService)
.createAsyncQuery(createAsyncQueryRequest);
Expand All @@ -105,4 +115,21 @@ public void testDoExecuteWithException() {
Assertions.assertTrue(exception instanceof RuntimeException);
Assertions.assertEquals("Error", exception.getMessage());
}

@Test
public void asyncQueryDisabled() {
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest("source = my_glue.default.alb_logs", "my_glue", LangType.SQL);
CreateAsyncQueryActionRequest request =
new CreateAsyncQueryActionRequest(createAsyncQueryRequest);
when(pluginSettings.getSettingValue(Settings.Key.ASYNC_QUERY_ENABLED)).thenReturn(false);
action.doExecute(task, request, actionListener);
verify(jobExecutorService, never()).createAsyncQuery(createAsyncQueryRequest);
Mockito.verify(actionListener).onFailure(exceptionArgumentCaptor.capture());
Exception exception = exceptionArgumentCaptor.getValue();
Assertions.assertTrue(exception instanceof IllegalAccessException);
Assertions.assertEquals(
"plugins.query.executionengine.async_query.enabled " + "setting is false",
exception.getMessage());
}
}
Loading