From 2223fb9b6569f7ab96d30a4ec9fcc15196c1a822 Mon Sep 17 00:00:00 2001 From: Frank Dattalo Date: Wed, 12 Jun 2024 11:17:02 -0700 Subject: [PATCH] Added Setting to Toggle Data Source Management Code Paths Signed-off-by: Frank Dattalo --- .../rest/RestAsyncQueryManagementAction.java | 27 +++ .../AsyncQueryExecutorServiceSpec.java | 5 +- .../RestAsyncQueryManagementActionTest.java | 83 +++++++++ .../sql/common/setting/Settings.java | 1 + .../rest/RestDataSourceQueryAction.java | 29 +++ .../OpenSearchDataSourceMetadataStorage.java | 28 ++- .../rest/RestDataSourceQueryActionTest.java | 83 +++++++++ ...enSearchDataSourceMetadataStorageTest.java | 71 ++++++++ docs/user/admin/settings.rst | 81 +++++++++ .../sql/asyncquery/AsyncQueryIT.java | 26 +++ .../sql/datasource/DataSourceAPIsIT.java | 134 ++++++++++++++ .../sql/datasource/DataSourceEnabledIT.java | 172 ++++++++++++++++++ .../sql/legacy/SQLIntegTestCase.java | 10 + .../setting/OpenSearchSettings.java | 14 ++ .../sql/opensearch/util/RestRequestUtil.java | 25 +++ .../org/opensearch/sql/plugin/SQLPlugin.java | 9 +- 16 files changed, 793 insertions(+), 5 deletions(-) create mode 100644 async-query/src/test/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementActionTest.java create mode 100644 datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java diff --git a/async-query/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java b/async-query/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java index ced5609083..90d0943eed 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.util.List; import java.util.Locale; +import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; @@ -26,11 +27,14 @@ import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasources.exceptions.DataSourceClientException; import org.opensearch.sql.datasources.exceptions.ErrorMessage; import org.opensearch.sql.datasources.utils.Scheduler; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.utils.MetricUtils; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.sql.opensearch.util.RestRequestUtil; import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; @@ -44,6 +48,7 @@ import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionRequest; import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse; +@RequiredArgsConstructor public class RestAsyncQueryManagementAction extends BaseRestHandler { public static final String ASYNC_QUERY_ACTIONS = "async_query_actions"; @@ -51,6 +56,8 @@ public class RestAsyncQueryManagementAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(RestAsyncQueryManagementAction.class); + private final OpenSearchSettings settings; + @Override public String getName() { return ASYNC_QUERY_ACTIONS; @@ -99,6 +106,9 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + if (!dataSourcesEnabled()) { + return dataSourcesDisabledError(restRequest); + } switch (restRequest.method()) { case POST: return executePostRequest(restRequest, nodeClient); @@ -271,4 +281,21 @@ private void addCustomerErrorMetric(RestRequest.Method requestMethod) { break; } } + + private boolean dataSourcesEnabled() { + return settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED); + } + + private RestChannelConsumer dataSourcesDisabledError(RestRequest request) { + + RestRequestUtil.consumeAllRequestParameters(request); + + return channel -> { + reportError( + channel, + new IllegalAccessException( + String.format("%s setting is false", Settings.Key.DATASOURCES_ENABLED.getKeyValue())), + BAD_REQUEST); + }; + } } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 9a94accd7d..8aa586da09 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -222,7 +222,10 @@ private DataSourceServiceImpl createDataSourceService() { String masterKey = "a57d991d9b573f75b9bba1df"; DataSourceMetadataStorage dataSourceMetadataStorage = new OpenSearchDataSourceMetadataStorage( - client, clusterService, new EncryptorImpl(masterKey)); + client, + clusterService, + new EncryptorImpl(masterKey), + (OpenSearchSettings) pluginSettings); return new DataSourceServiceImpl( new ImmutableSet.Builder() .add(new GlueDataSourceFactory(pluginSettings)) diff --git a/async-query/src/test/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementActionTest.java b/async-query/src/test/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementActionTest.java new file mode 100644 index 0000000000..ccee3eb642 --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementActionTest.java @@ -0,0 +1,83 @@ +package org.opensearch.sql.spark.rest; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.threadpool.ThreadPool; + +public class RestAsyncQueryManagementActionTest { + + private OpenSearchSettings settings; + private RestRequest request; + private RestChannel channel; + private NodeClient nodeClient; + private ThreadPool threadPool; + private RestAsyncQueryManagementAction unit; + + @BeforeEach + public void setup() { + settings = Mockito.mock(OpenSearchSettings.class); + request = Mockito.mock(RestRequest.class); + channel = Mockito.mock(RestChannel.class); + nodeClient = Mockito.mock(NodeClient.class); + threadPool = Mockito.mock(ThreadPool.class); + + Mockito.when(nodeClient.threadPool()).thenReturn(threadPool); + + unit = new RestAsyncQueryManagementAction(settings); + } + + @Test + @SneakyThrows + public void testWhenDataSourcesAreDisabled() { + setDataSourcesEnabled(false); + unit.handleRequest(request, channel, nodeClient); + Mockito.verifyNoInteractions(nodeClient); + ArgumentCaptor response = ArgumentCaptor.forClass(RestResponse.class); + Mockito.verify(channel, Mockito.times(1)).sendResponse(response.capture()); + Assertions.assertEquals(400, response.getValue().status().getStatus()); + JsonObject actualResponseJson = + new Gson().fromJson(response.getValue().content().utf8ToString(), JsonObject.class); + JsonObject expectedResponseJson = new JsonObject(); + expectedResponseJson.addProperty("status", 400); + expectedResponseJson.add("error", new JsonObject()); + expectedResponseJson.getAsJsonObject("error").addProperty("type", "IllegalAccessException"); + expectedResponseJson.getAsJsonObject("error").addProperty("reason", "Invalid Request"); + expectedResponseJson + .getAsJsonObject("error") + .addProperty("details", "plugins.query.datasources.enabled setting is false"); + Assertions.assertEquals(expectedResponseJson, actualResponseJson); + } + + @Test + @SneakyThrows + public void testWhenDataSourcesAreEnabled() { + setDataSourcesEnabled(true); + Mockito.when(request.method()).thenReturn(RestRequest.Method.GET); + unit.handleRequest(request, channel, nodeClient); + Mockito.verify(threadPool, Mockito.times(1)) + .schedule(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()); + Mockito.verifyNoInteractions(channel); + } + + @Test + public void testGetName() { + Assertions.assertEquals("async_query_actions", unit.getName()); + } + + private void setDataSourcesEnabled(boolean value) { + Mockito.when(settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED)).thenReturn(value); + } +} diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index e2b7ab2904..7346ee6722 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -33,6 +33,7 @@ public enum Key { ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"), DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"), DATASOURCES_LIMIT("plugins.query.datasources.limit"), + DATASOURCES_ENABLED("plugins.query.datasources.enabled"), METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"), METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"), diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java index 43249e8a28..558a7fe4b2 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java @@ -17,10 +17,12 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchSecurityException; +import org.opensearch.OpenSearchStatusException; import org.opensearch.client.node.NodeClient; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; @@ -28,6 +30,7 @@ import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasources.exceptions.ErrorMessage; @@ -37,7 +40,10 @@ import org.opensearch.sql.datasources.utils.XContentParserUtils; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.utils.MetricUtils; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.sql.opensearch.util.RestRequestUtil; +@RequiredArgsConstructor public class RestDataSourceQueryAction extends BaseRestHandler { public static final String DATASOURCE_ACTIONS = "datasource_actions"; @@ -45,6 +51,8 @@ public class RestDataSourceQueryAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(RestDataSourceQueryAction.class); + private final OpenSearchSettings settings; + @Override public String getName() { return DATASOURCE_ACTIONS; @@ -115,6 +123,9 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + if (!enabled()) { + return disabledError(restRequest); + } switch (restRequest.method()) { case POST: return executePostRequest(restRequest, nodeClient); @@ -314,4 +325,22 @@ private static boolean isClientError(Exception e) { || e instanceof IllegalArgumentException || e instanceof IllegalStateException; } + + private boolean enabled() { + return settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED); + } + + private RestChannelConsumer disabledError(RestRequest request) { + + RestRequestUtil.consumeAllRequestParameters(request); + + return channel -> { + reportError( + channel, + new OpenSearchStatusException( + String.format("%s setting is false", Settings.Key.DATASOURCES_ENABLED.getKeyValue()), + BAD_REQUEST), + BAD_REQUEST); + }; + } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index eeb0302ed0..682d79c972 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -42,11 +42,13 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.encryptor.Encryptor; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; import org.opensearch.sql.datasources.utils.XContentParserUtils; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataStorage { @@ -61,6 +63,7 @@ public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataSt private final ClusterService clusterService; private final Encryptor encryptor; + private final OpenSearchSettings settings; /** * This class implements DataSourceMetadataStorage interface using OpenSearch as underlying @@ -71,14 +74,21 @@ public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataSt * @param encryptor Encryptor. */ public OpenSearchDataSourceMetadataStorage( - Client client, ClusterService clusterService, Encryptor encryptor) { + Client client, + ClusterService clusterService, + Encryptor encryptor, + OpenSearchSettings settings) { this.client = client; this.clusterService = clusterService; this.encryptor = encryptor; + this.settings = settings; } @Override public List getDataSourceMetadata() { + if (!isEnabled()) { + return Collections.emptyList(); + } if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { createDataSourcesIndex(); return Collections.emptyList(); @@ -88,6 +98,9 @@ public List getDataSourceMetadata() { @Override public Optional getDataSourceMetadata(String datasourceName) { + if (!isEnabled()) { + return Optional.empty(); + } if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { createDataSourcesIndex(); return Optional.empty(); @@ -101,6 +114,9 @@ public Optional getDataSourceMetadata(String datasourceName) @Override public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + if (!isEnabled()) { + throw new IllegalStateException("Data source management is disabled"); + } encryptDecryptAuthenticationData(dataSourceMetadata, true); if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { createDataSourcesIndex(); @@ -134,6 +150,9 @@ public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { @Override public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + if (!isEnabled()) { + throw new IllegalStateException("Data source management is disabled"); + } encryptDecryptAuthenticationData(dataSourceMetadata, true); UpdateRequest updateRequest = new UpdateRequest(DATASOURCE_INDEX_NAME, dataSourceMetadata.getName()); @@ -163,6 +182,9 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { @Override public void deleteDataSourceMetadata(String datasourceName) { + if (!isEnabled()) { + throw new IllegalStateException("Data source management is disabled"); + } DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME); deleteRequest.id(datasourceName); deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -302,4 +324,8 @@ private void handleSigV4PropertiesEncryptionDecryption( .ifPresent(list::add); encryptOrDecrypt(propertiesMap, isEncryption, list); } + + private boolean isEnabled() { + return settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED); + } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java new file mode 100644 index 0000000000..fbe1b3bee5 --- /dev/null +++ b/datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java @@ -0,0 +1,83 @@ +package org.opensearch.sql.datasources.rest; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.threadpool.ThreadPool; + +public class RestDataSourceQueryActionTest { + + private OpenSearchSettings settings; + private RestRequest request; + private RestChannel channel; + private NodeClient nodeClient; + private ThreadPool threadPool; + private RestDataSourceQueryAction unit; + + @BeforeEach + public void setup() { + settings = Mockito.mock(OpenSearchSettings.class); + request = Mockito.mock(RestRequest.class); + channel = Mockito.mock(RestChannel.class); + nodeClient = Mockito.mock(NodeClient.class); + threadPool = Mockito.mock(ThreadPool.class); + + Mockito.when(nodeClient.threadPool()).thenReturn(threadPool); + + unit = new RestDataSourceQueryAction(settings); + } + + @Test + @SneakyThrows + public void testWhenDataSourcesAreDisabled() { + setDataSourcesEnabled(false); + unit.handleRequest(request, channel, nodeClient); + Mockito.verifyNoInteractions(nodeClient); + ArgumentCaptor response = ArgumentCaptor.forClass(RestResponse.class); + Mockito.verify(channel, Mockito.times(1)).sendResponse(response.capture()); + Assertions.assertEquals(400, response.getValue().status().getStatus()); + JsonObject actualResponseJson = + new Gson().fromJson(response.getValue().content().utf8ToString(), JsonObject.class); + JsonObject expectedResponseJson = new JsonObject(); + expectedResponseJson.addProperty("status", 400); + expectedResponseJson.add("error", new JsonObject()); + expectedResponseJson.getAsJsonObject("error").addProperty("type", "OpenSearchStatusException"); + expectedResponseJson.getAsJsonObject("error").addProperty("reason", "Invalid Request"); + expectedResponseJson + .getAsJsonObject("error") + .addProperty("details", "plugins.query.datasources.enabled setting is false"); + Assertions.assertEquals(expectedResponseJson, actualResponseJson); + } + + @Test + @SneakyThrows + public void testWhenDataSourcesAreEnabled() { + setDataSourcesEnabled(true); + Mockito.when(request.method()).thenReturn(RestRequest.Method.GET); + unit.handleRequest(request, channel, nodeClient); + Mockito.verify(threadPool, Mockito.times(1)) + .schedule(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()); + Mockito.verifyNoInteractions(channel); + } + + @Test + public void testGetName() { + Assertions.assertEquals("datasource_actions", unit.getName()); + } + + private void setDataSourcesEnabled(boolean value) { + Mockito.when(settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED)).thenReturn(value); + } +} diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java index 55b7528f60..03abe73763 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java @@ -46,10 +46,12 @@ import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.encryptor.Encryptor; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; @ExtendWith(MockitoExtension.class) public class OpenSearchDataSourceMetadataStorageTest { @@ -64,6 +66,8 @@ public class OpenSearchDataSourceMetadataStorageTest { @Mock private Encryptor encryptor; + @Mock private OpenSearchSettings openSearchSettings; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private SearchResponse searchResponse; @@ -81,6 +85,7 @@ public class OpenSearchDataSourceMetadataStorageTest { @SneakyThrows @Test public void testGetDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -112,6 +117,7 @@ public void testGetDataSourceMetadata() { @SneakyThrows @Test public void testGetOldDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -145,6 +151,7 @@ public void testGetOldDataSourceMetadata() { @SneakyThrows @Test public void testGetDataSourceMetadataWith404SearchResponse() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -165,6 +172,7 @@ public void testGetDataSourceMetadataWith404SearchResponse() { @SneakyThrows @Test public void testGetDataSourceMetadataWithParsingFailed() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -185,6 +193,7 @@ public void testGetDataSourceMetadataWithParsingFailed() { @SneakyThrows @Test public void testGetDataSourceMetadataWithAWSSigV4() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -216,6 +225,7 @@ public void testGetDataSourceMetadataWithAWSSigV4() { @SneakyThrows @Test public void testGetDataSourceMetadataWithBasicAuth() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -248,6 +258,7 @@ public void testGetDataSourceMetadataWithBasicAuth() { @SneakyThrows @Test public void testGetDataSourceMetadataList() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -272,6 +283,7 @@ public void testGetDataSourceMetadataList() { @SneakyThrows @Test public void testGetDataSourceMetadataListWithNoIndex() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); Mockito.when(client.admin().indices().create(ArgumentMatchers.any())) @@ -289,6 +301,7 @@ public void testGetDataSourceMetadataListWithNoIndex() { @SneakyThrows @Test public void testGetDataSourceMetadataWithNoIndex() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); Mockito.when(client.admin().indices().create(ArgumentMatchers.any())) @@ -305,6 +318,7 @@ public void testGetDataSourceMetadataWithNoIndex() { @Test public void testCreateDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -330,6 +344,7 @@ public void testCreateDataSourceMetadata() { @Test public void testCreateDataSourceMetadataWithOutCreatingIndex() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.TRUE); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); @@ -350,6 +365,7 @@ public void testCreateDataSourceMetadataWithOutCreatingIndex() { @Test public void testCreateDataSourceMetadataFailedWithNotFoundResponse() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -383,6 +399,7 @@ public void testCreateDataSourceMetadataFailedWithNotFoundResponse() { @Test public void testCreateDataSourceMetadataWithVersionConflict() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -413,6 +430,7 @@ public void testCreateDataSourceMetadataWithVersionConflict() { @Test public void testCreateDataSourceMetadataWithException() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -444,6 +462,7 @@ public void testCreateDataSourceMetadataWithException() { @Test public void testCreateDataSourceMetadataWithIndexCreationFailed() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -474,6 +493,7 @@ public void testCreateDataSourceMetadataWithIndexCreationFailed() { @Test public void testUpdateDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())).thenReturn(updateResponseActionFuture); @@ -492,6 +512,7 @@ public void testUpdateDataSourceMetadata() { @Test public void testUpdateDataSourceMetadataWithNOOP() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())).thenReturn(updateResponseActionFuture); @@ -510,6 +531,7 @@ public void testUpdateDataSourceMetadataWithNOOP() { @Test public void testUpdateDataSourceMetadataWithNotFoundResult() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())).thenReturn(updateResponseActionFuture); @@ -536,6 +558,7 @@ public void testUpdateDataSourceMetadataWithNotFoundResult() { @Test public void testUpdateDataSourceMetadataWithDocumentMissingException() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())) @@ -561,6 +584,7 @@ public void testUpdateDataSourceMetadataWithDocumentMissingException() { @Test public void testUpdateDataSourceMetadataWithRuntimeException() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())) @@ -586,6 +610,7 @@ public void testUpdateDataSourceMetadataWithRuntimeException() { @Test public void testDeleteDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(client.delete(ArgumentMatchers.any())).thenReturn(deleteResponseActionFuture); Mockito.when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); Mockito.when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); @@ -600,6 +625,7 @@ public void testDeleteDataSourceMetadata() { @Test public void testDeleteDataSourceMetadataWhichisAlreadyDeleted() { + setDataSourcesEnabled(true); Mockito.when(client.delete(ArgumentMatchers.any())).thenReturn(deleteResponseActionFuture); Mockito.when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); Mockito.when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); @@ -619,6 +645,7 @@ public void testDeleteDataSourceMetadataWhichisAlreadyDeleted() { @Test public void testDeleteDataSourceMetadataWithUnexpectedResult() { + setDataSourcesEnabled(true); Mockito.when(client.delete(ArgumentMatchers.any())).thenReturn(deleteResponseActionFuture); Mockito.when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); Mockito.when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); @@ -637,6 +664,43 @@ public void testDeleteDataSourceMetadataWithUnexpectedResult() { Mockito.verify(client.threadPool().getThreadContext(), Mockito.times(1)).stashContext(); } + @Test + public void testWhenDataSourcesAreDisabled() { + setDataSourcesEnabled(false); + + Assertions.assertEquals( + Optional.empty(), this.openSearchDataSourceMetadataStorage.getDataSourceMetadata("dummy")); + + Assertions.assertEquals( + Collections.emptyList(), this.openSearchDataSourceMetadataStorage.getDataSourceMetadata()); + + Assertions.assertThrows( + IllegalStateException.class, + () -> { + this.openSearchDataSourceMetadataStorage.createDataSourceMetadata( + getDataSourceMetadata()); + }, + "Data source management is disabled"); + + Assertions.assertThrows( + IllegalStateException.class, + () -> { + this.openSearchDataSourceMetadataStorage.updateDataSourceMetadata( + getDataSourceMetadata()); + }, + "Data source management is disabled"); + + Assertions.assertThrows( + IllegalStateException.class, + () -> { + this.openSearchDataSourceMetadataStorage.deleteDataSourceMetadata("dummy"); + }, + "Data source management is disabled"); + + Mockito.verify(clusterService.state().routingTable(), Mockito.times(0)) + .hasIndex(DATASOURCE_INDEX_NAME); + } + private String getBasicDataSourceMetadataString() throws JsonProcessingException { Map properties = new HashMap<>(); properties.put("prometheus.auth.type", "basicauth"); @@ -744,4 +808,11 @@ public void serialize( } }; } + + private void setDataSourcesEnabled(boolean enabled) { + Mockito.when( + openSearchSettings.getSettingValue( + ArgumentMatchers.eq(Settings.Key.DATASOURCES_ENABLED))) + .thenReturn(enabled); + } } diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 6531e84aa1..b62b0de3f0 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -630,3 +630,84 @@ Request :: } } } + +plugins.query.datasources.enabled +================================= + +Description +----------- + +This setting controls whether datasources are enabled. + +1. The default value is true +2. This setting is node scope +3. This setting can be updated dynamically + +Update Settings Request:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT 'localhost:9200/_cluster/settings?pretty' \ + ... -d '{"transient":{"plugins.query.datasources.enabled":"false"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "datasources": { + "enabled": "false" + } + } + } + } + } + +When Attempting to Call Data Source APIs:: + + sh$ curl -sS -H 'Content-Type: application/json' -X GET 'localhost:9200/_plugins/_query/_datasources' + { + "status": 400, + "error": { + "type": "OpenSearchStatusException", + "reason": "Invalid Request", + "details": "plugins.query.datasources.enabled is disabled" + } + } + +When Attempting to List Data Source:: + + sh$ curl -sS -H 'Content-Type: application/json' -X POST 'localhost:9200/_plugins/_ppl' \ + ... -d '{"query":"show datasources"}' + { + "schema": [ + { + "name": "DATASOURCE_NAME", + "type": "string" + }, + { + "name": "CONNECTOR_TYPE", + "type": "string" + } + ], + "datarows": [], + "total": 0, + "size": 0 + } + +To Re-enable Data Sources::: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT 'localhost:9200/_cluster/settings?pretty' \ + ... -d '{"transient":{"plugins.query.datasources.enabled":"true"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "datasources": { + "enabled": "true" + } + } + } + } + } + diff --git a/integ-test/src/test/java/org/opensearch/sql/asyncquery/AsyncQueryIT.java b/integ-test/src/test/java/org/opensearch/sql/asyncquery/AsyncQueryIT.java index 9b5cc96b0e..c41a52b6fd 100644 --- a/integ-test/src/test/java/org/opensearch/sql/asyncquery/AsyncQueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/asyncquery/AsyncQueryIT.java @@ -51,6 +51,32 @@ public void asyncQueryEnabledSettingsTest() throws IOException { updateClusterSettings(new ClusterSetting(PERSISTENT, setting, null)); } + @Test + public void dataSourceDisabledSettingsTest() throws IOException { + String setting = "plugins.query.datasources.enabled"; + // disable + updateClusterSettings(new ClusterSetting(PERSISTENT, setting, "false")); + + String query = "select 1"; + Response response = null; + try { + executeAsyncQueryToString(query); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + + JSONObject result = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(result.getInt("status"), equalTo(400)); + JSONObject error = result.getJSONObject("error"); + assertThat(error.getString("reason"), equalTo("Invalid Request")); + assertThat( + error.getString("details"), equalTo("plugins.query.datasources.enabled setting is false")); + assertThat(error.getString("type"), equalTo("IllegalAccessException")); + + // reset the setting + updateClusterSettings(new ClusterSetting(PERSISTENT, setting, null)); + } + protected String executeAsyncQueryToString(String query) throws IOException { Response response = client().performRequest(buildAsyncRequest(query, ASYNC_QUERY_ACTION_URL)); Assert.assertEquals(200, response.getStatusLine().getStatusCode()); diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index 5d693d6652..31fd781c51 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -19,10 +19,13 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.SneakyThrows; +import lombok.Value; +import org.json.JSONObject; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -30,6 +33,7 @@ import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.ResponseException; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -387,6 +391,136 @@ public void patchDataSourceAPITest() { Assert.assertEquals("test", dataSourceMetadata.getDescription()); } + @Test + public void testDataSourcesEnabledSettingIsTrueByDefault() { + Assert.assertTrue(getDataSourceEnabledSetting("defaults")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToTransientFalse() { + setDataSourcesEnabled("transient", false); + Assert.assertFalse(getDataSourceEnabledSetting("transient")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToTransientTrue() { + setDataSourcesEnabled("transient", true); + Assert.assertTrue(getDataSourceEnabledSetting("transient")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToPersistentFalse() { + setDataSourcesEnabled("persistent", false); + Assert.assertFalse(getDataSourceEnabledSetting("persistent")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToPersistentTrue() { + setDataSourcesEnabled("persistent", true); + Assert.assertTrue(getDataSourceEnabledSetting("persistent")); + } + + @Test + public void testDataSourcesEnabledSetToFalseRejectsApiOperations() { + setDataSourcesEnabled("transient", false); + validateAllDataSourceApisWithEnabledSetting(false); + } + + @Test + public void testDataSourcesEnabledSetToTrueAllowsApiOperations() { + setDataSourcesEnabled("transient", true); + validateAllDataSourceApisWithEnabledSetting(true); + } + + @SneakyThrows + private void validateAllDataSourceApisWithEnabledSetting(boolean dataSourcesEnabled) { + + @Value + class TestCase { + Request request; + int expectedResponseCodeOnSuccess; + String expectResponseToContainOnSuccess; + } + + TestCase[] testCases = + new TestCase[] { + // create + new TestCase( + getCreateDataSourceRequest(mockDataSourceMetadata("dummy")), + 201, + "Created DataSource"), + // read + new TestCase(getFetchDataSourceRequest("dummy"), 200, "dummy"), + // update + new TestCase( + getUpdateDataSourceRequest(mockDataSourceMetadata("dummy")), + 200, + "Updated DataSource"), + // list + new TestCase(getFetchDataSourceRequest(null), 200, "dummy"), + // delete + new TestCase(getDeleteDataSourceRequest("dummy"), 204, null) + }; + + for (TestCase testCase : testCases) { + + // data source APIs are eventually consistent. sleep delay is added for consistency + // see createDataSourceAPITest above. + Thread.sleep(2_000); + + final int expectedResponseCode = + dataSourcesEnabled ? testCase.getExpectedResponseCodeOnSuccess() : 400; + + final String expectedResponseBodyToContain = + dataSourcesEnabled + ? testCase.getExpectResponseToContainOnSuccess() + : "plugins.query.datasources.enabled setting is false"; + + Response response; + + try { + response = client().performRequest(testCase.getRequest()); + } catch (ResponseException e) { + response = e.getResponse(); + } + + Assert.assertEquals( + String.format( + "Test for " + testCase + " failed. Expected response code of %s, but got %s", + expectedResponseCode, + response.getStatusLine().getStatusCode()), + expectedResponseCode, + response.getStatusLine().getStatusCode()); + + if (expectedResponseBodyToContain != null) { + + String responseBody = getResponseBody(response); + + Assert.assertTrue( + String.format( + "Test for " + testCase + " failed. '%s' failed to contain '%s'", + responseBody, + expectedResponseBodyToContain), + responseBody.contains(expectedResponseBodyToContain)); + } + } + } + + @SneakyThrows + private boolean getDataSourceEnabledSetting(String... clusterSettingsTypeKeys) { + + final String settingKey = Settings.Key.DATASOURCES_ENABLED.getKeyValue(); + + JSONObject settings = getAllClusterSettings(); + + return Arrays.stream(clusterSettingsTypeKeys) + .map(settings::getJSONObject) + .filter(obj -> obj.has(settingKey)) + .map(obj -> obj.getBoolean(settingKey)) + .findFirst() + .orElseThrow(); + } + public DataSourceMetadata mockDataSourceMetadata(String name) { return new DataSourceMetadata.Builder() .setName(name) diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java new file mode 100644 index 0000000000..1a1fdd60be --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java @@ -0,0 +1,172 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.DATASOURCES; + +import lombok.SneakyThrows; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class DataSourceEnabledIT extends PPLIntegTestCase { + + @Override + protected boolean preserveClusterUponCompletion() { + return false; + } + + @Test + public void testDataSourceIndexIsCreatedByDefault() { + assertDataSourceCount(0); + assertSelectFromDataSourceReturnsDoesNotExist(); + assertDataSourceIndexCreated(true); + assertAsyncQueryApiDisabled(false); + } + + @Test + public void testDataSourceIndexIsCreatedIfSettingIsEnabled() { + setDataSourcesEnabled("transient", true); + assertDataSourceCount(0); + assertSelectFromDataSourceReturnsDoesNotExist(); + assertDataSourceIndexCreated(true); + assertAsyncQueryApiDisabled(false); + } + + @Test + public void testDataSourceIndexIsNotCreatedIfSettingIsDisabled() { + setDataSourcesEnabled("transient", false); + assertDataSourceCount(0); + assertSelectFromDataSourceReturnsDoesNotExist(); + assertDataSourceIndexCreated(false); + assertAsyncQueryApiDisabled(true); + } + + @Test + public void testAfterPreviousEnable() { + createOpenSearchDataSource(); + createIndex(); + assertDataSourceCount(1); + assertSelectFromDataSourceReturnsSuccess(); + assertSelectFromDummyIndexInValidDataSourceDataSourceReturnsDoesNotExist(); + assertAsyncQueryApiDisabled(false); + setDataSourcesEnabled("transient", false); + assertDataSourceCount(0); + assertSelectFromDataSourceReturnsDoesNotExist(); + assertAsyncQueryApiDisabled(true); + } + + @SneakyThrows + private void assertSelectFromDataSourceReturnsDoesNotExist() { + Request request = new Request("POST", "/_plugins/_sql"); + request.setJsonEntity(new JSONObject().put("query", "select * from self.myindex").toString()); + Response response = performRequest(request); + Assert.assertEquals(404, response.getStatusLine().getStatusCode()); + String result = getResponseBody(response); + Assert.assertTrue(result.contains("IndexNotFoundException[no such index [self.myindex]]")); + } + + @SneakyThrows + private void assertSelectFromDummyIndexInValidDataSourceDataSourceReturnsDoesNotExist() { + Request request = new Request("POST", "/_plugins/_sql"); + request.setJsonEntity(new JSONObject().put("query", "select * from self.dummy").toString()); + Response response = performRequest(request); + Assert.assertEquals(404, response.getStatusLine().getStatusCode()); + String result = getResponseBody(response); + // subtle difference in error messaging shows that it resolved self to a data source + Assert.assertTrue(result.contains("IndexNotFoundException[no such index [dummy]]")); + } + + @SneakyThrows + private void assertSelectFromDataSourceReturnsSuccess() { + Request request = new Request("POST", "/_plugins/_sql"); + request.setJsonEntity(new JSONObject().put("query", "select * from self.myindex").toString()); + Response response = performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + JSONObject result = new JSONObject(getResponseBody(response)); + Assert.assertTrue(result.has("datarows")); + Assert.assertTrue(result.has("schema")); + Assert.assertTrue(result.has("total")); + Assert.assertTrue(result.has("size")); + Assert.assertEquals(200, result.getNumber("status")); + } + + private void createIndex() { + Request request = new Request("PUT", "/myindex"); + Response response = performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private void createOpenSearchDataSource() { + Request request = new Request("POST", "/_plugins/_query/_datasources"); + request.setJsonEntity( + new JSONObject().put("connector", "OPENSEARCH").put("name", "self").toString()); + Response response = performRequest(request); + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + } + + @SneakyThrows + private void assertAsyncQueryApiDisabled(boolean expectDisabled) { + + Request request = new Request("POST", "/_plugins/_async_query"); + + request.setJsonEntity( + new JSONObject() + .put("query", "select * from self.myindex") + .put("datasource", "self") + .put("lang", "sql") + .toString()); + + Response response = performRequest(request); + Assert.assertEquals(400, response.getStatusLine().getStatusCode()); + + String expectBodyToContain = + expectDisabled + ? "plugins.query.datasources.enabled setting is false" + : "Please configure plugins.query.executionengine.spark.config in cluster settings to" + + " enable"; + + Assert.assertTrue(getResponseBody(response).contains(expectBodyToContain)); + } + + @SneakyThrows + private void assertDataSourceCount(int expected) { + Request request = new Request("POST", "/_plugins/_ppl"); + request.setJsonEntity(new JSONObject().put("query", "show datasources").toString()); + Response response = performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + JSONObject jsonBody = new JSONObject(getResponseBody(response)); + Assert.assertEquals(expected, jsonBody.getNumber("size")); + Assert.assertEquals(expected, jsonBody.getNumber("total")); + Assert.assertEquals(expected, jsonBody.getJSONArray("datarows").length()); + } + + @SneakyThrows + private void assertDataSourceIndexCreated(boolean expected) { + Request request = new Request("GET", "/" + DATASOURCES); + Response response = performRequest(request); + String responseBody = getResponseBody(response); + boolean indexDoesExist = + response.getStatusLine().getStatusCode() == 200 + && responseBody.contains(DATASOURCES) + && responseBody.contains("mappings"); + Assert.assertEquals(expected, indexDoesExist); + } + + @SneakyThrows + private Response performRequest(Request request) { + try { + return client().performRequest(request); + } catch (ResponseException e) { + return e.getResponse(); + } + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 8a0ad563a6..f476403243 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -54,6 +54,7 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.json.JSONArray; import org.json.JSONObject; @@ -172,6 +173,15 @@ protected void resetQuerySizeLimit() throws IOException { DEFAULT_QUERY_SIZE_LIMIT.toString())); } + @SneakyThrows + protected void setDataSourcesEnabled(String clusterSettingType, boolean value) { + updateClusterSettings( + new ClusterSetting( + clusterSettingType, + Settings.Key.DATASOURCES_ENABLED.getKeyValue(), + Boolean.toString(value))); + } + protected static void wipeAllClusterSettings() throws IOException { updateClusterSettings(new ClusterSetting("persistent", "*", null)); updateClusterSettings(new ClusterSetting("transient", "*", null)); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index c493aa46e5..b4ce82a828 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -132,6 +132,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting DATASOURCE_ENABLED_SETTING = + Setting.boolSetting( + Key.DATASOURCES_ENABLED.getKeyValue(), + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting ASYNC_QUERY_ENABLED_SETTING = Setting.boolSetting( Key.ASYNC_QUERY_ENABLED.getKeyValue(), @@ -265,6 +272,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.DATASOURCES_URI_HOSTS_DENY_LIST, DATASOURCE_URI_HOSTS_DENY_LIST, new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + register( + settingBuilder, + clusterSettings, + Key.DATASOURCES_ENABLED, + DATASOURCE_ENABLED_SETTING, + new Updater(Key.DATASOURCES_ENABLED)); register( settingBuilder, clusterSettings, @@ -389,6 +402,7 @@ public static List> pluginSettings() { .add(METRICS_ROLLING_WINDOW_SETTING) .add(METRICS_ROLLING_INTERVAL_SETTING) .add(DATASOURCE_URI_HOSTS_DENY_LIST) + .add(DATASOURCE_ENABLED_SETTING) .add(ASYNC_QUERY_ENABLED_SETTING) .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java new file mode 100644 index 0000000000..e02bcf5af9 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/util/RestRequestUtil.java @@ -0,0 +1,25 @@ +package org.opensearch.sql.opensearch.util; + +import lombok.NonNull; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; + +/** RestRequestUtil is a utility class for common operations on OpenSearch RestRequest's. */ +public class RestRequestUtil { + + private RestRequestUtil() { + // utility class + } + + /** + * Utility method for consuming all the request parameters. Doing this will ensure that the + * BaseRestHandler doesn't fail the request with an unconsumed parameter exception. + * + * @see org.opensearch.rest.BaseRestHandler#handleRequest(RestRequest, RestChannel, NodeClient) + * @param request - The request to consume all parameters on + */ + public static void consumeAllRequestParameters(@NonNull RestRequest request) { + request.params().keySet().forEach(request::param); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index a9eb38a2c2..cdb2d4fff8 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -138,8 +138,8 @@ public List getRestHandlers( new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), new RestQuerySettingsAction(settings, restController), - new RestDataSourceQueryAction(), - new RestAsyncQueryManagementAction()); + new RestDataSourceQueryAction((OpenSearchSettings) pluginSettings), + new RestAsyncQueryManagementAction((OpenSearchSettings) pluginSettings)); } /** Register action and handler so that transportClient can find proxy for action. */ @@ -274,7 +274,10 @@ private DataSourceServiceImpl createDataSourceService() { } DataSourceMetadataStorage dataSourceMetadataStorage = new OpenSearchDataSourceMetadataStorage( - client, clusterService, new EncryptorImpl(masterKey)); + client, + clusterService, + new EncryptorImpl(masterKey), + (OpenSearchSettings) pluginSettings); DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper = new DataSourceUserAuthorizationHelperImpl(client); return new DataSourceServiceImpl(