From 28d0fea109a33a71e0f4d1729d0f91ee4f318f69 Mon Sep 17 00:00:00 2001 From: Frank Dattalo Date: Tue, 28 May 2024 09:57:14 -0700 Subject: [PATCH] Added Setting to Toggle Data Source Management Code Paths Signed-off-by: Frank Dattalo --- .../sql/common/setting/Settings.java | 1 + .../rest/RestDataSourceQueryAction.java | 30 ++++ .../OpenSearchDataSourceMetadataStorage.java | 28 +++- .../rest/RestDataSourceQueryActionTest.java | 83 +++++++++++ ...enSearchDataSourceMetadataStorageTest.java | 71 ++++++++++ docs/user/admin/settings.rst | 61 ++++++++ .../sql/datasource/DataSourceAPIsIT.java | 134 ++++++++++++++++++ .../sql/datasource/DataSourceEnabledIT.java | 68 +++++++++ .../sql/legacy/SQLIntegTestCase.java | 10 ++ .../setting/OpenSearchSettings.java | 14 ++ .../org/opensearch/sql/plugin/SQLPlugin.java | 7 +- .../AsyncQueryExecutorServiceSpec.java | 5 +- 12 files changed, 508 insertions(+), 4 deletions(-) create mode 100644 datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index e2b7ab2904..7346ee6722 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -33,6 +33,7 @@ public enum Key { ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"), DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"), DATASOURCES_LIMIT("plugins.query.datasources.limit"), + DATASOURCES_ENABLED("plugins.query.datasources.enabled"), METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"), METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"), diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java index 43249e8a28..aa6a8f19c4 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java @@ -17,10 +17,12 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchSecurityException; +import org.opensearch.OpenSearchStatusException; import org.opensearch.client.node.NodeClient; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; @@ -28,6 +30,7 @@ import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasources.exceptions.ErrorMessage; @@ -37,7 +40,9 @@ import org.opensearch.sql.datasources.utils.XContentParserUtils; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.utils.MetricUtils; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +@RequiredArgsConstructor public class RestDataSourceQueryAction extends BaseRestHandler { public static final String DATASOURCE_ACTIONS = "datasource_actions"; @@ -45,6 +50,8 @@ public class RestDataSourceQueryAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(RestDataSourceQueryAction.class); + private final OpenSearchSettings settings; + @Override public String getName() { return DATASOURCE_ACTIONS; @@ -115,6 +122,9 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + if (!enabled()) { + return disabledError(restRequest); + } switch (restRequest.method()) { case POST: return executePostRequest(restRequest, nodeClient); @@ -314,4 +324,24 @@ private static boolean isClientError(Exception e) { || e instanceof IllegalArgumentException || e instanceof IllegalStateException; } + + private boolean enabled() { + return settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED); + } + + private RestChannelConsumer disabledError(RestRequest request) { + + // consume all the params of the request to ensure that the BaseRestHandler + // doesn't fail the request with an unconsumed parameter exception + request.params().keySet().forEach(request::param); + + return channel -> { + reportError( + channel, + new OpenSearchStatusException( + String.format("%s is disabled", Settings.Key.DATASOURCES_ENABLED.getKeyValue()), + BAD_REQUEST), + BAD_REQUEST); + }; + } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index eeb0302ed0..682d79c972 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -42,11 +42,13 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.encryptor.Encryptor; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; import org.opensearch.sql.datasources.utils.XContentParserUtils; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataStorage { @@ -61,6 +63,7 @@ public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataSt private final ClusterService clusterService; private final Encryptor encryptor; + private final OpenSearchSettings settings; /** * This class implements DataSourceMetadataStorage interface using OpenSearch as underlying @@ -71,14 +74,21 @@ public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataSt * @param encryptor Encryptor. */ public OpenSearchDataSourceMetadataStorage( - Client client, ClusterService clusterService, Encryptor encryptor) { + Client client, + ClusterService clusterService, + Encryptor encryptor, + OpenSearchSettings settings) { this.client = client; this.clusterService = clusterService; this.encryptor = encryptor; + this.settings = settings; } @Override public List getDataSourceMetadata() { + if (!isEnabled()) { + return Collections.emptyList(); + } if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { createDataSourcesIndex(); return Collections.emptyList(); @@ -88,6 +98,9 @@ public List getDataSourceMetadata() { @Override public Optional getDataSourceMetadata(String datasourceName) { + if (!isEnabled()) { + return Optional.empty(); + } if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { createDataSourcesIndex(); return Optional.empty(); @@ -101,6 +114,9 @@ public Optional getDataSourceMetadata(String datasourceName) @Override public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + if (!isEnabled()) { + throw new IllegalStateException("Data source management is disabled"); + } encryptDecryptAuthenticationData(dataSourceMetadata, true); if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { createDataSourcesIndex(); @@ -134,6 +150,9 @@ public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { @Override public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + if (!isEnabled()) { + throw new IllegalStateException("Data source management is disabled"); + } encryptDecryptAuthenticationData(dataSourceMetadata, true); UpdateRequest updateRequest = new UpdateRequest(DATASOURCE_INDEX_NAME, dataSourceMetadata.getName()); @@ -163,6 +182,9 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { @Override public void deleteDataSourceMetadata(String datasourceName) { + if (!isEnabled()) { + throw new IllegalStateException("Data source management is disabled"); + } DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME); deleteRequest.id(datasourceName); deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -302,4 +324,8 @@ private void handleSigV4PropertiesEncryptionDecryption( .ifPresent(list::add); encryptOrDecrypt(propertiesMap, isEncryption, list); } + + private boolean isEnabled() { + return settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED); + } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java new file mode 100644 index 0000000000..83f368c74a --- /dev/null +++ b/datasources/src/test/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryActionTest.java @@ -0,0 +1,83 @@ +package org.opensearch.sql.datasources.rest; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.threadpool.ThreadPool; + +public class RestDataSourceQueryActionTest { + + private OpenSearchSettings settings; + private RestRequest request; + private RestChannel channel; + private NodeClient nodeClient; + private ThreadPool threadPool; + private RestDataSourceQueryAction unit; + + @BeforeEach + public void setup() { + settings = Mockito.mock(OpenSearchSettings.class); + request = Mockito.mock(RestRequest.class); + channel = Mockito.mock(RestChannel.class); + nodeClient = Mockito.mock(NodeClient.class); + threadPool = Mockito.mock(ThreadPool.class); + + Mockito.when(nodeClient.threadPool()).thenReturn(threadPool); + + unit = new RestDataSourceQueryAction(settings); + } + + @Test + @SneakyThrows + public void testWhenDataSourcesAreDisabled() { + setDataSourcesEnabled(false); + unit.handleRequest(request, channel, nodeClient); + Mockito.verifyNoInteractions(nodeClient); + ArgumentCaptor response = ArgumentCaptor.forClass(RestResponse.class); + Mockito.verify(channel, Mockito.times(1)).sendResponse(response.capture()); + Assertions.assertEquals(400, response.getValue().status().getStatus()); + JsonObject actualResponseJson = + new Gson().fromJson(response.getValue().content().utf8ToString(), JsonObject.class); + JsonObject expectedResponseJson = new JsonObject(); + expectedResponseJson.addProperty("status", 400); + expectedResponseJson.add("error", new JsonObject()); + expectedResponseJson.getAsJsonObject("error").addProperty("type", "OpenSearchStatusException"); + expectedResponseJson.getAsJsonObject("error").addProperty("reason", "Invalid Request"); + expectedResponseJson + .getAsJsonObject("error") + .addProperty("details", "plugins.query.datasources.enabled is disabled"); + Assertions.assertEquals(expectedResponseJson, actualResponseJson); + } + + @Test + @SneakyThrows + public void testWhenDataSourcesAreEnabled() { + setDataSourcesEnabled(true); + Mockito.when(request.method()).thenReturn(RestRequest.Method.GET); + unit.handleRequest(request, channel, nodeClient); + Mockito.verify(threadPool, Mockito.times(1)) + .schedule(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()); + Mockito.verifyNoInteractions(channel); + } + + @Test + public void testGetName() { + Assertions.assertEquals("datasource_actions", unit.getName()); + } + + private void setDataSourcesEnabled(boolean value) { + Mockito.when(settings.getSettingValue(Settings.Key.DATASOURCES_ENABLED)).thenReturn(value); + } +} diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java index 886e84298d..5ae9e58702 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java @@ -41,10 +41,12 @@ import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.encryptor.Encryptor; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; @ExtendWith(MockitoExtension.class) public class OpenSearchDataSourceMetadataStorageTest { @@ -59,6 +61,8 @@ public class OpenSearchDataSourceMetadataStorageTest { @Mock private Encryptor encryptor; + @Mock private OpenSearchSettings openSearchSettings; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private SearchResponse searchResponse; @@ -76,6 +80,7 @@ public class OpenSearchDataSourceMetadataStorageTest { @SneakyThrows @Test public void testGetDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -107,6 +112,7 @@ public void testGetDataSourceMetadata() { @SneakyThrows @Test public void testGetOldDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -140,6 +146,7 @@ public void testGetOldDataSourceMetadata() { @SneakyThrows @Test public void testGetDataSourceMetadataWith404SearchResponse() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -160,6 +167,7 @@ public void testGetDataSourceMetadataWith404SearchResponse() { @SneakyThrows @Test public void testGetDataSourceMetadataWithParsingFailed() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -180,6 +188,7 @@ public void testGetDataSourceMetadataWithParsingFailed() { @SneakyThrows @Test public void testGetDataSourceMetadataWithAWSSigV4() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -211,6 +220,7 @@ public void testGetDataSourceMetadataWithAWSSigV4() { @SneakyThrows @Test public void testGetDataSourceMetadataWithBasicAuth() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -243,6 +253,7 @@ public void testGetDataSourceMetadataWithBasicAuth() { @SneakyThrows @Test public void testGetDataSourceMetadataList() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(true); Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); @@ -267,6 +278,7 @@ public void testGetDataSourceMetadataList() { @SneakyThrows @Test public void testGetDataSourceMetadataListWithNoIndex() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); Mockito.when(client.admin().indices().create(ArgumentMatchers.any())) @@ -284,6 +296,7 @@ public void testGetDataSourceMetadataListWithNoIndex() { @SneakyThrows @Test public void testGetDataSourceMetadataWithNoIndex() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); Mockito.when(client.admin().indices().create(ArgumentMatchers.any())) @@ -300,6 +313,7 @@ public void testGetDataSourceMetadataWithNoIndex() { @Test public void testCreateDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -325,6 +339,7 @@ public void testCreateDataSourceMetadata() { @Test public void testCreateDataSourceMetadataWithOutCreatingIndex() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.TRUE); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); @@ -345,6 +360,7 @@ public void testCreateDataSourceMetadataWithOutCreatingIndex() { @Test public void testCreateDataSourceMetadataFailedWithNotFoundResponse() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -378,6 +394,7 @@ public void testCreateDataSourceMetadataFailedWithNotFoundResponse() { @Test public void testCreateDataSourceMetadataWithVersionConflict() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -408,6 +425,7 @@ public void testCreateDataSourceMetadataWithVersionConflict() { @Test public void testCreateDataSourceMetadataWithException() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -439,6 +457,7 @@ public void testCreateDataSourceMetadataWithException() { @Test public void testCreateDataSourceMetadataWithIndexCreationFailed() { + setDataSourcesEnabled(true); Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) .thenReturn(Boolean.FALSE); @@ -469,6 +488,7 @@ public void testCreateDataSourceMetadataWithIndexCreationFailed() { @Test public void testUpdateDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())).thenReturn(updateResponseActionFuture); @@ -487,6 +507,7 @@ public void testUpdateDataSourceMetadata() { @Test public void testUpdateDataSourceMetadataWithNOOP() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())).thenReturn(updateResponseActionFuture); @@ -505,6 +526,7 @@ public void testUpdateDataSourceMetadataWithNOOP() { @Test public void testUpdateDataSourceMetadataWithNotFoundResult() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())).thenReturn(updateResponseActionFuture); @@ -531,6 +553,7 @@ public void testUpdateDataSourceMetadataWithNotFoundResult() { @Test public void testUpdateDataSourceMetadataWithDocumentMissingException() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())) @@ -556,6 +579,7 @@ public void testUpdateDataSourceMetadataWithDocumentMissingException() { @Test public void testUpdateDataSourceMetadataWithRuntimeException() { + setDataSourcesEnabled(true); Mockito.when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())) @@ -581,6 +605,7 @@ public void testUpdateDataSourceMetadataWithRuntimeException() { @Test public void testDeleteDataSourceMetadata() { + setDataSourcesEnabled(true); Mockito.when(client.delete(ArgumentMatchers.any())).thenReturn(deleteResponseActionFuture); Mockito.when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); Mockito.when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); @@ -595,6 +620,7 @@ public void testDeleteDataSourceMetadata() { @Test public void testDeleteDataSourceMetadataWhichisAlreadyDeleted() { + setDataSourcesEnabled(true); Mockito.when(client.delete(ArgumentMatchers.any())).thenReturn(deleteResponseActionFuture); Mockito.when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); Mockito.when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); @@ -614,6 +640,7 @@ public void testDeleteDataSourceMetadataWhichisAlreadyDeleted() { @Test public void testDeleteDataSourceMetadataWithUnexpectedResult() { + setDataSourcesEnabled(true); Mockito.when(client.delete(ArgumentMatchers.any())).thenReturn(deleteResponseActionFuture); Mockito.when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); Mockito.when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); @@ -632,6 +659,43 @@ public void testDeleteDataSourceMetadataWithUnexpectedResult() { Mockito.verify(client.threadPool().getThreadContext(), Mockito.times(1)).stashContext(); } + @Test + public void testWhenDataSourcesAreDisabled() { + setDataSourcesEnabled(false); + + Assertions.assertEquals( + Optional.empty(), this.openSearchDataSourceMetadataStorage.getDataSourceMetadata("dummy")); + + Assertions.assertEquals( + Collections.emptyList(), this.openSearchDataSourceMetadataStorage.getDataSourceMetadata()); + + Assertions.assertThrows( + IllegalStateException.class, + () -> { + this.openSearchDataSourceMetadataStorage.createDataSourceMetadata( + getDataSourceMetadata()); + }, + "Data source management is disabled"); + + Assertions.assertThrows( + IllegalStateException.class, + () -> { + this.openSearchDataSourceMetadataStorage.updateDataSourceMetadata( + getDataSourceMetadata()); + }, + "Data source management is disabled"); + + Assertions.assertThrows( + IllegalStateException.class, + () -> { + this.openSearchDataSourceMetadataStorage.deleteDataSourceMetadata("dummy"); + }, + "Data source management is disabled"); + + Mockito.verify(clusterService.state().routingTable(), Mockito.times(0)) + .hasIndex(DATASOURCE_INDEX_NAME); + } + private String getBasicDataSourceMetadataString() throws JsonProcessingException { Map properties = new HashMap<>(); properties.put("prometheus.auth.type", "basicauth"); @@ -715,4 +779,11 @@ private DataSourceMetadata getDataSourceMetadata() { .setAllowedRoles(Collections.singletonList("prometheus_access")) .build(); } + + private void setDataSourcesEnabled(boolean enabled) { + Mockito.when( + openSearchSettings.getSettingValue( + ArgumentMatchers.eq(Settings.Key.DATASOURCES_ENABLED))) + .thenReturn(enabled); + } } diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 6531e84aa1..833931129b 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -630,3 +630,64 @@ Request :: } } } + +plugins.query.datasources.enabled +================================= + +Description +----------- + +This setting controls whether datasources are enabled. + +1. The default value is true +2. This setting is node scope +3. This setting can be updated dynamically + +Update Settings Request:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT 'localhost:9200/_cluster/settings?pretty' \ + ... -d '{"transient":{"plugins.query.datasources.enabled":"false"}}' + { + "acknowledged" : true, + "persistent" : {}, + "transient" : { + "plugins" : { + "query" : { + "datasources" : { + "enabled" : "false" + } + } + } + } + } + +When Attempting to Call Data Source APIs:: + + sh$ curl -sS -H 'Content-Type: application/json' -X GET 'localhost:9200/_plugins/_query/_datasources' + { + "status": 400, + "error": { + "type": "OpenSearchStatusException", + "reason": "Invalid Request", + "details": "plugins.query.datasources.enabled is disabled" + } + } + +When Attempting to List Data Source::: + sh$ curl -sS -H 'Content-Type: application/json' -X POST 'localhost:9200/_plugins/_ppl' \ + ... -d '{"query":"show datasources"}' + { + "schema": [ + { + "name": "DATASOURCE_NAME", + "type": "string" + }, + { + "name": "CONNECTOR_TYPE", + "type": "string" + } + ], + "datarows": [], + "total": 0, + "size": 0 + } \ No newline at end of file diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index 05e19f8285..7aa28e00f9 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -20,10 +20,13 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.SneakyThrows; +import lombok.Value; +import org.json.JSONObject; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -31,6 +34,7 @@ import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.ResponseException; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -385,6 +389,136 @@ public void patchDataSourceAPITest() { Assert.assertEquals("test", dataSourceMetadata.getDescription()); } + @Test + public void testDataSourcesEnabledSettingIsTrueByDefault() { + Assert.assertTrue(getDataSourceEnabledSetting("defaults")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToTransientFalse() { + setDataSourcesEnabled("transient", false); + Assert.assertFalse(getDataSourceEnabledSetting("transient")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToTransientTrue() { + setDataSourcesEnabled("transient", true); + Assert.assertTrue(getDataSourceEnabledSetting("transient")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToPersistentFalse() { + setDataSourcesEnabled("persistent", false); + Assert.assertFalse(getDataSourceEnabledSetting("persistent")); + } + + @Test + public void testDataSourcesEnabledSettingCanBeSetToPersistentTrue() { + setDataSourcesEnabled("persistent", true); + Assert.assertTrue(getDataSourceEnabledSetting("persistent")); + } + + @Test + public void testDataSourcesEnabledSetToFalseRejectsApiOperations() { + setDataSourcesEnabled("transient", false); + validateAllDataSourceApisWithEnabledSetting(false); + } + + @Test + public void testDataSourcesEnabledSetToTrueAllowsApiOperations() { + setDataSourcesEnabled("transient", true); + validateAllDataSourceApisWithEnabledSetting(true); + } + + @SneakyThrows + private void validateAllDataSourceApisWithEnabledSetting(boolean dataSourcesEnabled) { + + @Value + class TestCase { + Request request; + int expectedResponseCodeOnSuccess; + String expectResponseToContainOnSuccess; + } + + TestCase[] testCases = + new TestCase[] { + // create + new TestCase( + getCreateDataSourceRequest(mockDataSourceMetadata("dummy")), + 201, + "Created DataSource"), + // read + new TestCase(getFetchDataSourceRequest("dummy"), 200, "dummy"), + // update + new TestCase( + getUpdateDataSourceRequest(mockDataSourceMetadata("dummy")), + 200, + "Updated DataSource"), + // list + new TestCase(getFetchDataSourceRequest(null), 200, "dummy"), + // delete + new TestCase(getDeleteDataSourceRequest("dummy"), 204, null) + }; + + for (TestCase testCase : testCases) { + + // data source APIs are eventually consistent. sleep delay is added for consistency + // see createDataSourceAPITest above. + Thread.sleep(2_000); + + final int expectedResponseCode = + dataSourcesEnabled ? testCase.getExpectedResponseCodeOnSuccess() : 400; + + final String expectedResponseBodyToContain = + dataSourcesEnabled + ? testCase.getExpectResponseToContainOnSuccess() + : "plugins.query.datasources.enabled is disabled"; + + Response response; + + try { + response = client().performRequest(testCase.getRequest()); + } catch (ResponseException e) { + response = e.getResponse(); + } + + Assert.assertEquals( + String.format( + "Test for " + testCase + " failed. Expected response code of %s, but got %s", + expectedResponseCode, + response.getStatusLine().getStatusCode()), + expectedResponseCode, + response.getStatusLine().getStatusCode()); + + if (expectedResponseBodyToContain != null) { + + String responseBody = getResponseBody(response); + + Assert.assertTrue( + String.format( + "Test for " + testCase + " failed. '%s' failed to contain '%s'", + responseBody, + expectedResponseBodyToContain), + responseBody.contains(expectedResponseBodyToContain)); + } + } + } + + @SneakyThrows + private boolean getDataSourceEnabledSetting(String... clusterSettingsTypeKeys) { + + final String settingKey = Settings.Key.DATASOURCES_ENABLED.getKeyValue(); + + JSONObject settings = getAllClusterSettings(); + + return Arrays.stream(clusterSettingsTypeKeys) + .map(settings::getJSONObject) + .filter(obj -> obj.has(settingKey)) + .map(obj -> obj.getBoolean(settingKey)) + .findFirst() + .orElseThrow(); + } + public DataSourceMetadata mockDataSourceMetadata(String name) { return new DataSourceMetadata.Builder() .setName(name) diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java new file mode 100644 index 0000000000..efa121e48b --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceEnabledIT.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.DATASOURCES; + +import lombok.SneakyThrows; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class DataSourceEnabledIT extends PPLIntegTestCase { + + @Override + protected boolean preserveClusterUponCompletion() { + return false; + } + + @Test + public void testDataSourceIndexIsCreatedByDefault() { + listDataSources(); + Assert.assertTrue(isDataSourceIndexCreated()); + } + + @Test + public void testDataSourceIndexIsCreatedIfSettingIsEnabled() { + setDataSourcesEnabled("transient", true); + listDataSources(); + Assert.assertTrue(isDataSourceIndexCreated()); + } + + @Test + public void testDataSourceIndexIsNotCreatedIfSettingIsDisabled() { + setDataSourcesEnabled("transient", false); + listDataSources(); + Assert.assertFalse(isDataSourceIndexCreated()); + } + + @SneakyThrows + private void listDataSources() { + Request request = new Request("POST", "/_plugins/_ppl"); + request.setJsonEntity(new JSONObject().put("query", "show datasources").toString()); + client().performRequest(request); + } + + @SneakyThrows + boolean isDataSourceIndexCreated() { + Request request = new Request("GET", "/" + DATASOURCES); + Response response; + try { + response = client().performRequest(request); + } catch (ResponseException e) { + response = e.getResponse(); + } + String responseBody = getResponseBody(response); + return response.getStatusLine().getStatusCode() == 200 + && responseBody.contains(DATASOURCES) + && responseBody.contains("mappings"); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 058182f123..238265786b 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -55,6 +55,7 @@ import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.json.JSONArray; import org.json.JSONObject; @@ -172,6 +173,15 @@ protected void resetQuerySizeLimit() throws IOException { DEFAULT_QUERY_SIZE_LIMIT.toString())); } + @SneakyThrows + protected void setDataSourcesEnabled(String clusterSettingType, boolean value) { + updateClusterSettings( + new ClusterSetting( + clusterSettingType, + Settings.Key.DATASOURCES_ENABLED.getKeyValue(), + Boolean.toString(value))); + } + protected static void wipeAllClusterSettings() throws IOException { updateClusterSettings(new ClusterSetting("persistent", "*", null)); updateClusterSettings(new ClusterSetting("transient", "*", null)); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index c493aa46e5..b4ce82a828 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -132,6 +132,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting DATASOURCE_ENABLED_SETTING = + Setting.boolSetting( + Key.DATASOURCES_ENABLED.getKeyValue(), + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting ASYNC_QUERY_ENABLED_SETTING = Setting.boolSetting( Key.ASYNC_QUERY_ENABLED.getKeyValue(), @@ -265,6 +272,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.DATASOURCES_URI_HOSTS_DENY_LIST, DATASOURCE_URI_HOSTS_DENY_LIST, new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + register( + settingBuilder, + clusterSettings, + Key.DATASOURCES_ENABLED, + DATASOURCE_ENABLED_SETTING, + new Updater(Key.DATASOURCES_ENABLED)); register( settingBuilder, clusterSettings, @@ -389,6 +402,7 @@ public static List> pluginSettings() { .add(METRICS_ROLLING_WINDOW_SETTING) .add(METRICS_ROLLING_INTERVAL_SETTING) .add(DATASOURCE_URI_HOSTS_DENY_LIST) + .add(DATASOURCE_ENABLED_SETTING) .add(ASYNC_QUERY_ENABLED_SETTING) .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index a9eb38a2c2..404855bb6b 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -138,7 +138,7 @@ public List getRestHandlers( new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), new RestQuerySettingsAction(settings, restController), - new RestDataSourceQueryAction(), + new RestDataSourceQueryAction((OpenSearchSettings) pluginSettings), new RestAsyncQueryManagementAction()); } @@ -274,7 +274,10 @@ private DataSourceServiceImpl createDataSourceService() { } DataSourceMetadataStorage dataSourceMetadataStorage = new OpenSearchDataSourceMetadataStorage( - client, clusterService, new EncryptorImpl(masterKey)); + client, + clusterService, + new EncryptorImpl(masterKey), + (OpenSearchSettings) pluginSettings); DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper = new DataSourceUserAuthorizationHelperImpl(client); return new DataSourceServiceImpl( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 9c378b9274..b35bd5450a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -219,7 +219,10 @@ private DataSourceServiceImpl createDataSourceService() { String masterKey = "a57d991d9b573f75b9bba1df"; DataSourceMetadataStorage dataSourceMetadataStorage = new OpenSearchDataSourceMetadataStorage( - client, clusterService, new EncryptorImpl(masterKey)); + client, + clusterService, + new EncryptorImpl(masterKey), + (OpenSearchSettings) pluginSettings); return new DataSourceServiceImpl( new ImmutableSet.Builder() .add(new GlueDataSourceFactory(pluginSettings))