From 353b0d7443354eb46478f2a81ae7b0938083f6ca Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Tue, 12 Mar 2024 09:27:33 -0700 Subject: [PATCH] Datasource disable feature (#2539) Signed-off-by: Vamsi Manohar --- core/build.gradle | 4 +- .../sql/datasource/DataSourceService.java | 20 +- .../datasource/model/DataSourceMetadata.java | 242 ++++++++++++------ .../datasource/model/DataSourceStatus.java | 37 +++ .../sql/analysis/AnalyzerTestBase.java | 22 +- .../model/DataSourceMetadataTest.java | 158 ++++++++++++ .../datasource/DataSourceTableScanTest.java | 14 +- .../DataSourceNotFoundException.java | 2 +- .../DatasourceDisabledException.java | 13 + .../service/DataSourceServiceImpl.java | 82 +++--- .../utils/XContentParserUtils.java | 26 +- .../resources/datasources-index-mapping.yml | 2 + ...SourceUserAuthorizationHelperImplTest.java | 12 +- .../glue/GlueDataSourceFactoryTest.java | 64 +++-- .../DataSourceLoaderCacheImplTest.java | 24 +- .../service/DataSourceServiceImplTest.java | 157 +++++------- ...enSearchDataSourceMetadataStorageTest.java | 68 ++--- .../TransportCreateDataSourceActionTest.java | 24 +- .../TransportGetDataSourceActionTest.java | 17 +- .../TransportUpdateDataSourceActionTest.java | 17 +- .../utils/XContentParserUtilsTest.java | 72 +++--- docs/user/ppl/admin/datasources.rst | 18 +- .../sql/datasource/DataSourceAPIsIT.java | 238 +++++++++++------ .../sql/ppl/InformationSchemaCommandIT.java | 14 +- .../ppl/PrometheusDataSourceCommandsIT.java | 49 +++- .../sql/ppl/ShowDataSourcesCommandIT.java | 14 +- .../src/test/resources/datasources.json | 4 +- .../storage/PrometheusStorageFactoryTest.java | 30 ++- .../dispatcher/SparkQueryDispatcher.java | 4 +- .../rest/RestAsyncQueryManagementAction.java | 4 +- ...AsyncQueryExecutorServiceImplSpecTest.java | 69 +++-- .../AsyncQueryExecutorServiceSpec.java | 60 ++--- .../dispatcher/SparkQueryDispatcherTest.java | 150 ++++++----- .../storage/SparkStorageFactoryTest.java | 20 +- 34 files changed, 1105 insertions(+), 646 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/datasource/model/DataSourceStatus.java create mode 100644 core/src/test/java/org/opensearch/sql/datasource/model/DataSourceMetadataTest.java create mode 100644 datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DatasourceDisabledException.java diff --git a/core/build.gradle b/core/build.gradle index 9fa1808b2e..caf0c5e430 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -108,7 +108,9 @@ jacocoTestCoverageVerification { excludes = [ 'org.opensearch.sql.utils.MLCommonsConstants', 'org.opensearch.sql.utils.Constants', - 'org.opensearch.sql.datasource.model.*' + 'org.opensearch.sql.datasource.model.DataSource', + 'org.opensearch.sql.datasource.model.DataSourceStatus', + 'org.opensearch.sql.datasource.model.DataSourceType' ] limit { counter = 'LINE' diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java index 162fe9e8f8..6af5d19e5c 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java @@ -14,7 +14,8 @@ public interface DataSourceService { /** - * Returns {@link DataSource} corresponding to the DataSource name. + * Returns {@link DataSource} corresponding to the DataSource name only if the datasource is + * active and authorized. * * @param dataSourceName Name of the {@link DataSource}. * @return {@link DataSource}. @@ -40,15 +41,6 @@ public interface DataSourceService { */ DataSourceMetadata getDataSourceMetadata(String name); - /** - * Returns dataSourceMetadata object with specific name. The returned objects contain all the - * metadata information without any filtering. - * - * @param name name of the {@link DataSource}. - * @return set of {@link DataSourceMetadata}. - */ - DataSourceMetadata getRawDataSourceMetadata(String name); - /** * Register {@link DataSource} defined by {@link DataSourceMetadata}. * @@ -84,4 +76,12 @@ public interface DataSourceService { * @param dataSourceName name of the {@link DataSource}. */ Boolean dataSourceExists(String dataSourceName); + + /** + * Performs authorization and datasource status check and then returns RawDataSourceMetadata. + * Specifically for addressing use cases in SparkQueryDispatcher. + * + * @param dataSourceName of the {@link DataSource} + */ + DataSourceMetadata verifyDataSourceAccessAndGetRawMetadata(String dataSourceName); } diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java index 9e47f9b37e..e3dd0e8ff7 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Collections; @@ -19,27 +20,26 @@ import java.util.function.Function; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.datasource.DataSourceService; @Getter -@Setter @EqualsAndHashCode @JsonIgnoreProperties(ignoreUnknown = true) public class DataSourceMetadata { public static final String DEFAULT_RESULT_INDEX = "query_execution_result"; public static final int MAX_RESULT_INDEX_NAME_SIZE = 255; + private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; // OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+"; public static String INVALID_RESULT_INDEX_NAME_SIZE = "Result index name size must contains less than " + MAX_RESULT_INDEX_NAME_SIZE - + " characters"; + + " characters."; public static String INVALID_CHAR_IN_RESULT_INDEX_NAME = "Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and" - + " _(underscore)"; + + " _(underscore)."; public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + DEFAULT_RESULT_INDEX; @@ -57,96 +57,188 @@ public class DataSourceMetadata { @JsonProperty private String resultIndex; + @JsonProperty private DataSourceStatus status; + public static Function DATASOURCE_TO_RESULT_INDEX = datasourceName -> String.format("%s_%s", DEFAULT_RESULT_INDEX, datasourceName); - public DataSourceMetadata( - String name, - String description, - DataSourceType connector, - List allowedRoles, - Map properties, - String resultIndex) { - this.name = name; - String errorMessage = validateCustomResultIndex(resultIndex); - if (errorMessage != null) { - throw new IllegalArgumentException(errorMessage); + private DataSourceMetadata(Builder builder) { + this.name = builder.name; + this.description = builder.description; + this.connector = builder.connector; + this.allowedRoles = builder.allowedRoles; + this.properties = builder.properties; + this.resultIndex = builder.resultIndex; + this.status = builder.status; + } + + public static class Builder { + private String name; + private String description; + private DataSourceType connector; + private List allowedRoles; + private Map properties; + private String resultIndex; // Optional + private DataSourceStatus status; + + public Builder() {} + + public Builder(DataSourceMetadata dataSourceMetadata) { + this.name = dataSourceMetadata.getName(); + this.description = dataSourceMetadata.getDescription(); + this.connector = dataSourceMetadata.getConnector(); + this.resultIndex = dataSourceMetadata.getResultIndex(); + this.status = dataSourceMetadata.getStatus(); + this.allowedRoles = new ArrayList<>(dataSourceMetadata.getAllowedRoles()); + this.properties = new HashMap<>(dataSourceMetadata.getProperties()); } - if (resultIndex == null) { - this.resultIndex = fromNameToCustomResultIndex(); - } else { - this.resultIndex = resultIndex; + + public Builder setName(String name) { + this.name = name; + return this; } - this.connector = connector; - this.description = description; - this.properties = properties; - this.allowedRoles = allowedRoles; - } + public Builder setDescription(String description) { + this.description = description; + return this; + } - public DataSourceMetadata() { - this.description = StringUtils.EMPTY; - this.allowedRoles = new ArrayList<>(); - this.properties = new HashMap<>(); - } + public Builder setConnector(DataSourceType connector) { + this.connector = connector; + return this; + } - /** - * Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch - * {@link DataSource} to {@link DataSourceService}. - */ - public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { - return new DataSourceMetadata( - DEFAULT_DATASOURCE_NAME, - StringUtils.EMPTY, - DataSourceType.OPENSEARCH, - Collections.emptyList(), - ImmutableMap.of(), - null); - } + public Builder setAllowedRoles(List allowedRoles) { + this.allowedRoles = allowedRoles; + return this; + } - public String validateCustomResultIndex(String resultIndex) { - if (resultIndex == null) { - return null; + public Builder setProperties(Map properties) { + this.properties = properties; + return this; } - if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { - return INVALID_RESULT_INDEX_NAME_SIZE; + + public Builder setResultIndex(String resultIndex) { + this.resultIndex = resultIndex; + return this; } - if (!resultIndex.matches(RESULT_INDEX_NAME_PATTERN)) { - return INVALID_CHAR_IN_RESULT_INDEX_NAME; + + public Builder setDataSourceStatus(DataSourceStatus status) { + this.status = status; + return this; } - if (resultIndex != null && !resultIndex.startsWith(DEFAULT_RESULT_INDEX)) { - return INVALID_RESULT_INDEX_PREFIX; + + public DataSourceMetadata build() { + validateMissingAttributes(); + validateName(); + validateCustomResultIndex(); + fillNullAttributes(); + return new DataSourceMetadata(this); } - return null; - } - /** - * Since we are using datasource name to create result index, we need to make sure that the final - * name is valid - * - * @param resultIndex result index name - * @return valid result index name - */ - private String convertToValidResultIndex(String resultIndex) { - // Limit Length - if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { - resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE); + private void fillNullAttributes() { + if (resultIndex == null) { + this.resultIndex = fromNameToCustomResultIndex(); + } + if (status == null) { + this.status = DataSourceStatus.ACTIVE; + } + if (description == null) { + this.description = StringUtils.EMPTY; + } + if (properties == null) { + this.properties = ImmutableMap.of(); + } + if (allowedRoles == null) { + this.allowedRoles = ImmutableList.of(); + } } - // Pattern Matching: Remove characters that don't match the pattern - StringBuilder validChars = new StringBuilder(); - for (char c : resultIndex.toCharArray()) { - if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) { - validChars.append(c); + private void validateMissingAttributes() { + List missingAttributes = new ArrayList<>(); + if (name == null) { + missingAttributes.add("name"); + } + if (connector == null) { + missingAttributes.add("connector"); + } + if (!missingAttributes.isEmpty()) { + String errorMessage = + "Datasource configuration error: " + + String.join(", ", missingAttributes) + + " cannot be null or empty."; + throw new IllegalArgumentException(errorMessage); } } - return validChars.toString(); - } - public String fromNameToCustomResultIndex() { - if (name == null) { - throw new IllegalArgumentException("Datasource name cannot be null"); + private void validateName() { + if (!name.matches(DATASOURCE_NAME_REGEX)) { + throw new IllegalArgumentException( + String.format( + "DataSource Name: %s contains illegal characters. Allowed characters:" + + " a-zA-Z0-9_-*@.", + name)); + } + } + + private void validateCustomResultIndex() { + if (resultIndex == null) { + return; + } + StringBuilder errorMessage = new StringBuilder(); + if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { + errorMessage.append(INVALID_RESULT_INDEX_NAME_SIZE); + } + if (!resultIndex.matches(RESULT_INDEX_NAME_PATTERN)) { + errorMessage.append(INVALID_CHAR_IN_RESULT_INDEX_NAME); + } + if (!resultIndex.startsWith(DEFAULT_RESULT_INDEX)) { + errorMessage.append(INVALID_RESULT_INDEX_PREFIX); + } + if (errorMessage.length() > 0) { + throw new IllegalArgumentException(errorMessage.toString()); + } + } + + /** + * Since we are using datasource name to create result index, we need to make sure that the + * final name is valid + * + * @param resultIndex result index name + * @return valid result index name + */ + private String convertToValidResultIndex(String resultIndex) { + // Limit Length + if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { + resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE); + } + + // Pattern Matching: Remove characters that don't match the pattern + StringBuilder validChars = new StringBuilder(); + for (char c : resultIndex.toCharArray()) { + if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) { + validChars.append(c); + } + } + return validChars.toString(); } - return convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase())); + + private String fromNameToCustomResultIndex() { + return convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase())); + } + } + + /** + * Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch + * {@link DataSource} to {@link DataSourceService}. + */ + public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { + return new DataSourceMetadata.Builder() + .setName(DEFAULT_DATASOURCE_NAME) + .setDescription(StringUtils.EMPTY) + .setConnector(DataSourceType.OPENSEARCH) + .setAllowedRoles(Collections.emptyList()) + .setProperties(ImmutableMap.of()) + .build(); } } diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceStatus.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceStatus.java new file mode 100644 index 0000000000..bca47217c1 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceStatus.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource.model; + +/** Enum for capturing the current datasource status. */ +public enum DataSourceStatus { + ACTIVE("active"), + DISABLED("disabled"); + + private String text; + + DataSourceStatus(String text) { + this.text = text; + } + + public String getText() { + return this.text; + } + + /** + * Get DataSourceStatus from text. + * + * @param text text. + * @return DataSourceStatus {@link DataSourceStatus}. + */ + public static DataSourceStatus fromString(String text) { + for (DataSourceStatus dataSourceStatus : DataSourceStatus.values()) { + if (dataSourceStatus.text.equalsIgnoreCase(text)) { + return dataSourceStatus; + } + } + throw new IllegalArgumentException("No DataSourceStatus with text " + text + " found"); + } +} diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index bfd68ee53a..b35cfbb5e1 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -19,7 +19,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.analysis.symbol.Namespace; @@ -196,13 +195,10 @@ public Set getDataSourceMetadata(boolean isDefaultDataSource return Stream.of(opensearchDataSource, prometheusDataSource) .map( ds -> - new DataSourceMetadata( - ds.getName(), - StringUtils.EMPTY, - ds.getConnectorType(), - Collections.emptyList(), - ImmutableMap.of(), - null)) + new DataSourceMetadata.Builder() + .setName(ds.getName()) + .setConnector(ds.getConnectorType()) + .build()) .collect(Collectors.toSet()); } @@ -211,11 +207,6 @@ public DataSourceMetadata getDataSourceMetadata(String name) { return null; } - @Override - public DataSourceMetadata getRawDataSourceMetadata(String name) { - return null; - } - @Override public void createDataSource(DataSourceMetadata metadata) { throw new UnsupportedOperationException("unsupported operation"); @@ -243,6 +234,11 @@ public void deleteDataSource(String dataSourceName) {} public Boolean dataSourceExists(String dataSourceName) { return dataSourceName.equals(DEFAULT_DATASOURCE_NAME) || dataSourceName.equals("prometheus"); } + + @Override + public DataSourceMetadata verifyDataSourceAccessAndGetRawMetadata(String dataSourceName) { + return null; + } } private class TestTableFunctionImplementation implements TableFunctionImplementation { diff --git a/core/src/test/java/org/opensearch/sql/datasource/model/DataSourceMetadataTest.java b/core/src/test/java/org/opensearch/sql/datasource/model/DataSourceMetadataTest.java new file mode 100644 index 0000000000..24f830f18e --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/datasource/model/DataSourceMetadataTest.java @@ -0,0 +1,158 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource.model; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.opensearch.sql.datasource.model.DataSourceStatus.ACTIVE; +import static org.opensearch.sql.datasource.model.DataSourceType.PROMETHEUS; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; + +public class DataSourceMetadataTest { + + @Test + public void testBuilderAndGetterMethods() { + List allowedRoles = Arrays.asList("role1", "role2"); + Map properties = new HashMap<>(); + properties.put("key", "value"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("test") + .setDescription("test description") + .setConnector(DataSourceType.OPENSEARCH) + .setAllowedRoles(allowedRoles) + .setProperties(properties) + .setResultIndex("query_execution_result_test123") + .setDataSourceStatus(ACTIVE) + .build(); + + assertEquals("test", metadata.getName()); + assertEquals("test description", metadata.getDescription()); + assertEquals(DataSourceType.OPENSEARCH, metadata.getConnector()); + assertEquals(allowedRoles, metadata.getAllowedRoles()); + assertEquals(properties, metadata.getProperties()); + assertEquals("query_execution_result_test123", metadata.getResultIndex()); + assertEquals(ACTIVE, metadata.getStatus()); + } + + @Test + public void testDefaultDataSourceMetadata() { + DataSourceMetadata defaultMetadata = DataSourceMetadata.defaultOpenSearchDataSourceMetadata(); + assertNotNull(defaultMetadata); + assertEquals(DataSourceType.OPENSEARCH, defaultMetadata.getConnector()); + assertTrue(defaultMetadata.getAllowedRoles().isEmpty()); + assertTrue(defaultMetadata.getProperties().isEmpty()); + } + + @Test + public void testNameValidation() { + try { + new DataSourceMetadata.Builder().setName("Invalid$$$Name").setConnector(PROMETHEUS).build(); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals( + "DataSource Name: Invalid$$$Name contains illegal characters. Allowed characters:" + + " a-zA-Z0-9_-*@.", + e.getMessage()); + } + } + + @Test + public void testResultIndexValidation() { + try { + new DataSourceMetadata.Builder() + .setName("test") + .setConnector(PROMETHEUS) + .setResultIndex("invalid_result_index") + .build(); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(DataSourceMetadata.INVALID_RESULT_INDEX_PREFIX, e.getMessage()); + } + } + + @Test + public void testMissingAttributes() { + try { + new DataSourceMetadata.Builder().build(); + fail("Should have thrown an IllegalArgumentException due to missing attributes"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("name")); + assertTrue(e.getMessage().contains("connector")); + } + } + + @Test + public void testFillAttributes() { + DataSourceMetadata metadata = + new DataSourceMetadata.Builder().setName("test").setConnector(PROMETHEUS).build(); + + assertEquals("test", metadata.getName()); + assertEquals(PROMETHEUS, metadata.getConnector()); + assertTrue(metadata.getDescription().isEmpty()); + assertTrue(metadata.getAllowedRoles().isEmpty()); + assertTrue(metadata.getProperties().isEmpty()); + assertEquals("query_execution_result_test", metadata.getResultIndex()); + assertEquals(ACTIVE, metadata.getStatus()); + } + + @Test + public void testLengthyResultIndexName() { + try { + new DataSourceMetadata.Builder() + .setName("test") + .setConnector(PROMETHEUS) + .setResultIndex("query_execution_result_" + RandomStringUtils.randomAlphanumeric(300)) + .build(); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals( + "Result index name size must contains less than 255 characters.Result index name has" + + " invalid character. Valid characters are a-z, 0-9, -(hyphen) and _(underscore).", + e.getMessage()); + } + } + + @Test + public void testInbuiltLengthyResultIndexName() { + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName(RandomStringUtils.randomAlphabetic(250)) + .setConnector(PROMETHEUS) + .build(); + assertEquals(255, dataSourceMetadata.getResultIndex().length()); + } + + @Test + public void testCopyFromAnotherMetadata() { + List allowedRoles = Arrays.asList("role1", "role2"); + Map properties = new HashMap<>(); + properties.put("key", "value"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("test") + .setDescription("test description") + .setConnector(DataSourceType.OPENSEARCH) + .setAllowedRoles(allowedRoles) + .setProperties(properties) + .setResultIndex("query_execution_result_test123") + .setDataSourceStatus(ACTIVE) + .build(); + DataSourceMetadata copiedMetadata = new DataSourceMetadata.Builder(metadata).build(); + assertEquals(metadata.getResultIndex(), copiedMetadata.getResultIndex()); + assertEquals(metadata.getProperties(), copiedMetadata.getProperties()); + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java index 5c7182a752..53cbd15b8e 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java @@ -13,12 +13,10 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -61,13 +59,11 @@ void testIterator() { dataSourceSet.stream() .map( dataSource -> - new DataSourceMetadata( - dataSource.getName(), - StringUtils.EMPTY, - dataSource.getConnectorType(), - Collections.emptyList(), - ImmutableMap.of(), - null)) + new DataSourceMetadata.Builder() + .setName(dataSource.getName()) + .setConnector(dataSource.getConnectorType()) + .setProperties(ImmutableMap.of("prometheus.uri", "localhost:9200")) + .build()) .collect(Collectors.toSet()); when(dataSourceService.getDataSourceMetadata(false)).thenReturn(dataSourceMetadata); diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DataSourceNotFoundException.java b/datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DataSourceNotFoundException.java index 40b601000c..7850543910 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DataSourceNotFoundException.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DataSourceNotFoundException.java @@ -8,7 +8,7 @@ package org.opensearch.sql.datasources.exceptions; /** DataSourceNotFoundException. */ -public class DataSourceNotFoundException extends RuntimeException { +public class DataSourceNotFoundException extends DataSourceClientException { public DataSourceNotFoundException(String message) { super(message); } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DatasourceDisabledException.java b/datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DatasourceDisabledException.java new file mode 100644 index 0000000000..181721a6cc --- /dev/null +++ b/datasources/src/main/java/org/opensearch/sql/datasources/exceptions/DatasourceDisabledException.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasources.exceptions; + +/** Exception for taking actions on a disabled datasource. */ +public class DatasourceDisabledException extends DataSourceClientException { + public DatasourceDisabledException(String message) { + super(message); + } +} diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java index 8ba618fb44..4fe42fbd5c 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java @@ -8,15 +8,15 @@ import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; import static org.opensearch.sql.datasources.utils.XContentParserUtils.*; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import java.util.*; -import org.opensearch.sql.common.utils.StringUtils; +import java.util.stream.Collectors; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; +import org.opensearch.sql.datasources.exceptions.DatasourceDisabledException; import org.opensearch.sql.storage.DataSourceFactory; /** @@ -29,7 +29,6 @@ */ public class DataSourceServiceImpl implements DataSourceService { - private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; public static final Set CONFIDENTIAL_AUTH_KEYS = Set.of("auth.username", "auth.password", "auth.access_key", "auth.secret_key"); @@ -57,27 +56,24 @@ public Set getDataSourceMetadata(boolean isDefaultDataSource if (isDefaultDataSourceRequired) { dataSourceMetadataSet.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); } - removeAuthInfo(dataSourceMetadataSet); - return dataSourceMetadataSet; + return removeAuthInfo(dataSourceMetadataSet); } @Override public DataSourceMetadata getDataSourceMetadata(String dataSourceName) { DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName); - removeAuthInfo(dataSourceMetadata); - return dataSourceMetadata; + return removeAuthInfo(dataSourceMetadata); } @Override public DataSource getDataSource(String dataSourceName) { DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName); - this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); + verifyDataSourceAccess(dataSourceMetadata); return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); } @Override public void createDataSource(DataSourceMetadata metadata) { - validateDataSourceMetaData(metadata); if (!metadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { this.dataSourceLoaderCache.getOrLoadDataSource(metadata); this.dataSourceMetadataStorage.createDataSourceMetadata(metadata); @@ -86,7 +82,6 @@ public void createDataSource(DataSourceMetadata metadata) { @Override public void updateDataSource(DataSourceMetadata dataSourceMetadata) { - validateDataSourceMetaData(dataSourceMetadata); if (!dataSourceMetadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { this.dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); this.dataSourceMetadataStorage.updateDataSourceMetadata(dataSourceMetadata); @@ -101,8 +96,9 @@ public void patchDataSource(Map dataSourceData) { if (!dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) { DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata((String) dataSourceData.get(NAME_FIELD)); - replaceOldDatasourceMetadata(dataSourceData, dataSourceMetadata); - updateDataSource(dataSourceMetadata); + DataSourceMetadata updatedMetadata = + constructUpdatedDatasourceMetadata(dataSourceData, dataSourceMetadata); + updateDataSource(updatedMetadata); } else { throw new UnsupportedOperationException( "Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME); @@ -125,24 +121,19 @@ public Boolean dataSourceExists(String dataSourceName) { || this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName).isPresent(); } - /** - * This can be moved to a different validator class when we introduce more connectors. - * - * @param metadata {@link DataSourceMetadata}. - */ - private void validateDataSourceMetaData(DataSourceMetadata metadata) { - Preconditions.checkArgument( - !Strings.isNullOrEmpty(metadata.getName()), - "Missing Name Field from a DataSource. Name is a required parameter."); - Preconditions.checkArgument( - metadata.getName().matches(DATASOURCE_NAME_REGEX), - StringUtils.format( - "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.", - metadata.getName())); - Preconditions.checkArgument( - !Objects.isNull(metadata.getProperties()), - "Missing properties field in datasource configuration." - + " Properties are required parameters."); + @Override + public DataSourceMetadata verifyDataSourceAccessAndGetRawMetadata(String dataSourceName) { + DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName); + verifyDataSourceAccess(dataSourceMetadata); + return dataSourceMetadata; + } + + private void verifyDataSourceAccess(DataSourceMetadata dataSourceMetadata) { + if (dataSourceMetadata.getStatus().equals(DataSourceStatus.DISABLED)) { + throw new DatasourceDisabledException( + String.format("Datasource %s is disabled.", dataSourceMetadata.getName())); + } + this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); } /** @@ -151,34 +142,37 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) { * @param dataSourceData * @param metadata {@link DataSourceMetadata}. */ - private void replaceOldDatasourceMetadata( + private DataSourceMetadata constructUpdatedDatasourceMetadata( Map dataSourceData, DataSourceMetadata metadata) { - + DataSourceMetadata.Builder metadataBuilder = new DataSourceMetadata.Builder(metadata); for (String key : dataSourceData.keySet()) { switch (key) { // Name and connector should not be modified case DESCRIPTION_FIELD: - metadata.setDescription((String) dataSourceData.get(DESCRIPTION_FIELD)); + metadataBuilder.setDescription((String) dataSourceData.get(DESCRIPTION_FIELD)); break; case ALLOWED_ROLES_FIELD: - metadata.setAllowedRoles((List) dataSourceData.get(ALLOWED_ROLES_FIELD)); + metadataBuilder.setAllowedRoles((List) dataSourceData.get(ALLOWED_ROLES_FIELD)); break; case PROPERTIES_FIELD: Map properties = new HashMap<>(metadata.getProperties()); properties.putAll(((Map) dataSourceData.get(PROPERTIES_FIELD))); + metadataBuilder.setProperties(properties); break; - case NAME_FIELD: - case CONNECTOR_FIELD: + case RESULT_INDEX_FIELD: + metadataBuilder.setResultIndex((String) dataSourceData.get(RESULT_INDEX_FIELD)); + case STATUS_FIELD: + metadataBuilder.setDataSourceStatus((DataSourceStatus) dataSourceData.get(STATUS_FIELD)); + default: break; } } + return metadataBuilder.build(); } - @Override - public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) { + private DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) { if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) { return DataSourceMetadata.defaultOpenSearchDataSourceMetadata(); - } else { Optional dataSourceMetadataOptional = this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName); @@ -193,11 +187,11 @@ public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) { // It is advised to avoid sending any kind credential // info in api response from security point of view. - private void removeAuthInfo(Set dataSourceMetadataSet) { - dataSourceMetadataSet.forEach(this::removeAuthInfo); + private Set removeAuthInfo(Set dataSourceMetadataSet) { + return dataSourceMetadataSet.stream().map(this::removeAuthInfo).collect(Collectors.toSet()); } - private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) { + private DataSourceMetadata removeAuthInfo(DataSourceMetadata dataSourceMetadata) { HashMap safeProperties = new HashMap<>(dataSourceMetadata.getProperties()); safeProperties .entrySet() @@ -205,6 +199,6 @@ private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) { entry -> CONFIDENTIAL_AUTH_KEYS.stream() .anyMatch(confidentialKey -> entry.getKey().endsWith(confidentialKey))); - dataSourceMetadata.setProperties(safeProperties); + return new DataSourceMetadata.Builder(dataSourceMetadata).setProperties(safeProperties).build(); } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/utils/XContentParserUtils.java b/datasources/src/main/java/org/opensearch/sql/datasources/utils/XContentParserUtils.java index 6af2a5a761..7c8c33b147 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/utils/XContentParserUtils.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/utils/XContentParserUtils.java @@ -21,6 +21,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; /** Utitlity class to serialize and deserialize objects in XContent. */ @@ -33,6 +34,7 @@ public class XContentParserUtils { public static final String ALLOWED_ROLES_FIELD = "allowedRoles"; public static final String RESULT_INDEX_FIELD = "resultIndex"; + public static final String STATUS_FIELD = "status"; /** * Convert xcontent parser to DataSourceMetadata. @@ -48,6 +50,7 @@ public static DataSourceMetadata toDataSourceMetadata(XContentParser parser) thr List allowedRoles = new ArrayList<>(); Map properties = new HashMap<>(); String resultIndex = null; + DataSourceStatus status = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = parser.currentName(); @@ -79,15 +82,22 @@ public static DataSourceMetadata toDataSourceMetadata(XContentParser parser) thr case RESULT_INDEX_FIELD: resultIndex = parser.textOrNull(); break; + case STATUS_FIELD: + status = DataSourceStatus.fromString(parser.textOrNull()); + break; default: throw new IllegalArgumentException("Unknown field: " + fieldName); } } - if (name == null || connector == null) { - throw new IllegalArgumentException("name and connector are required fields."); - } - return new DataSourceMetadata( - name, description, connector, allowedRoles, properties, resultIndex); + return new DataSourceMetadata.Builder() + .setName(name) + .setDescription(description) + .setConnector(connector) + .setProperties(properties) + .setAllowedRoles(allowedRoles) + .setResultIndex(resultIndex) + .setDataSourceStatus(status) + .build(); } public static Map toMap(XContentParser parser) throws IOException { @@ -97,6 +107,7 @@ public static Map toMap(XContentParser parser) throws IOExceptio List allowedRoles = new ArrayList<>(); Map properties = new HashMap<>(); String resultIndex; + DataSourceStatus status; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = parser.currentName(); @@ -133,6 +144,10 @@ public static Map toMap(XContentParser parser) throws IOExceptio resultIndex = parser.textOrNull(); resultMap.put(RESULT_INDEX_FIELD, resultIndex); break; + case STATUS_FIELD: + status = DataSourceStatus.fromString(parser.textOrNull()); + resultMap.put(STATUS_FIELD, status); + break; default: throw new IllegalArgumentException("Unknown field: " + fieldName); } @@ -202,6 +217,7 @@ public static XContentBuilder convertToXContent(DataSourceMetadata metadata) thr } builder.endObject(); builder.field(RESULT_INDEX_FIELD, metadata.getResultIndex()); + builder.field(STATUS_FIELD, metadata.getStatus()); builder.endObject(); return builder; } diff --git a/datasources/src/main/resources/datasources-index-mapping.yml b/datasources/src/main/resources/datasources-index-mapping.yml index 0206a97886..589630d790 100644 --- a/datasources/src/main/resources/datasources-index-mapping.yml +++ b/datasources/src/main/resources/datasources-index-mapping.yml @@ -16,4 +16,6 @@ properties: connector: type: keyword resultIndex: + type: keyword + status: type: keyword \ No newline at end of file diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/auth/DataSourceUserAuthorizationHelperImplTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/auth/DataSourceUserAuthorizationHelperImplTest.java index 6ee3c12edd..6471fd03f7 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/auth/DataSourceUserAuthorizationHelperImplTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/auth/DataSourceUserAuthorizationHelperImplTest.java @@ -7,7 +7,6 @@ import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT; -import java.util.HashMap; import java.util.List; import org.junit.Assert; import org.junit.jupiter.api.Test; @@ -102,11 +101,10 @@ public void testAuthorizeDataSourceWithException() { } private DataSourceMetadata dataSourceMetadata() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); - dataSourceMetadata.setProperties(new HashMap<>()); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("test") + .setAllowedRoles(List.of("prometheus_access")) + .setConnector(DataSourceType.PROMETHEUS) + .build(); } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java index 4dd054de70..52f8ec9cd1 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java @@ -35,17 +35,18 @@ void testCreateGLueDatSource() { .thenReturn(Collections.emptyList()); GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); - DataSourceMetadata metadata = new DataSourceMetadata(); HashMap properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); properties.put("glue.indexstore.opensearch.auth", "noauth"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); - - metadata.setName("my_glue"); - metadata.setConnector(DataSourceType.S3GLUE); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); DataSource dataSource = glueDatasourceFactory.createDataSource(metadata); Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType()); UnsupportedOperationException unsupportedOperationException = @@ -66,7 +67,6 @@ void testCreateGLueDatSourceWithBasicAuthForIndexStore() { .thenReturn(Collections.emptyList()); GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); - DataSourceMetadata metadata = new DataSourceMetadata(); HashMap properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); @@ -75,10 +75,12 @@ void testCreateGLueDatSourceWithBasicAuthForIndexStore() { properties.put("glue.indexstore.opensearch.auth.username", "username"); properties.put("glue.indexstore.opensearch.auth.password", "password"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); - - metadata.setName("my_glue"); - metadata.setConnector(DataSourceType.S3GLUE); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); DataSource dataSource = glueDatasourceFactory.createDataSource(metadata); Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType()); UnsupportedOperationException unsupportedOperationException = @@ -99,17 +101,18 @@ void testCreateGLueDatSourceWithAwsSigV4AuthForIndexStore() { .thenReturn(Collections.emptyList()); GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); - DataSourceMetadata metadata = new DataSourceMetadata(); HashMap properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); properties.put("glue.indexstore.opensearch.auth", "awssigv4"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); - - metadata.setName("my_glue"); - metadata.setConnector(DataSourceType.S3GLUE); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); DataSource dataSource = glueDatasourceFactory.createDataSource(metadata); Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType()); UnsupportedOperationException unsupportedOperationException = @@ -128,16 +131,19 @@ void testCreateGLueDatSourceWithAwsSigV4AuthForIndexStore() { void testCreateGLueDatSourceWithBasicAuthForIndexStoreAndMissingFields() { GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); - DataSourceMetadata metadata = new DataSourceMetadata(); HashMap properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); properties.put("glue.indexstore.opensearch.auth", "basicauth"); - metadata.setName("my_glue"); - metadata.setConnector(DataSourceType.S3GLUE); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + IllegalArgumentException illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); @@ -154,7 +160,6 @@ void testCreateGLueDatSourceWithInvalidFlintHost() { .thenReturn(List.of("127.0.0.0/8")); GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); - DataSourceMetadata metadata = new DataSourceMetadata(); HashMap properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); @@ -162,9 +167,12 @@ void testCreateGLueDatSourceWithInvalidFlintHost() { properties.put("glue.indexstore.opensearch.auth", "noauth"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); - metadata.setName("my_glue"); - metadata.setConnector(DataSourceType.S3GLUE); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); IllegalArgumentException illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); @@ -181,7 +189,6 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { .thenReturn(List.of("127.0.0.0/8")); GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); - DataSourceMetadata metadata = new DataSourceMetadata(); HashMap properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); @@ -191,9 +198,12 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { properties.put("glue.indexstore.opensearch.auth", "noauth"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); - metadata.setName("my_glue"); - metadata.setConnector(DataSourceType.S3GLUE); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); IllegalArgumentException illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceLoaderCacheImplTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceLoaderCacheImplTest.java index b2ea221eb7..6238355238 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceLoaderCacheImplTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceLoaderCacheImplTest.java @@ -7,7 +7,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.List; import org.junit.jupiter.api.Assertions; @@ -46,11 +45,7 @@ public void setup() { void testGetOrLoadDataSource() { DataSourceLoaderCache dataSourceLoaderCache = new DataSourceLoaderCacheImpl(Collections.singleton(dataSourceFactory)); - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH); - dataSourceMetadata.setAllowedRoles(Collections.emptyList()); - dataSourceMetadata.setProperties(ImmutableMap.of()); + DataSourceMetadata dataSourceMetadata = getMetadata(); DataSource dataSource = dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); verify(dataSourceFactory, times(1)).createDataSource(dataSourceMetadata); Assertions.assertEquals( @@ -65,18 +60,19 @@ void testGetOrLoadDataSourceWithMetadataUpdate() { DataSourceMetadata dataSourceMetadata = getMetadata(); dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); - dataSourceMetadata.setAllowedRoles(List.of("testDS_access")); + dataSourceMetadata = + new DataSourceMetadata.Builder(dataSourceMetadata) + .setAllowedRoles(List.of("testDS_access")) + .build(); dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata); - verify(dataSourceFactory, times(2)).createDataSource(dataSourceMetadata); + verify(dataSourceFactory, times(2)).createDataSource(any()); } private DataSourceMetadata getMetadata() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH); - dataSourceMetadata.setAllowedRoles(Collections.emptyList()); - dataSourceMetadata.setProperties(ImmutableMap.of()); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("testDS") + .setConnector(DataSourceType.OPENSEARCH) + .build(); } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java index bf88302833..5a94945e5b 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,9 +38,11 @@ import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; +import org.opensearch.sql.datasources.exceptions.DatasourceDisabledException; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.sql.storage.StorageEngine; @@ -164,57 +165,6 @@ void testCreateDataSourceSuccessCase() { assertEquals(DataSourceType.OPENSEARCH, dataSource.getConnectorType()); } - @Test - void testCreateDataSourceWithDisallowedDatasourceName() { - DataSourceMetadata dataSourceMetadata = - metadata( - "testDS$$$", DataSourceType.OPENSEARCH, Collections.emptyList(), ImmutableMap.of()); - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> dataSourceService.createDataSource(dataSourceMetadata)); - assertEquals( - "DataSource Name: testDS$$$ contains illegal characters." - + " Allowed characters: a-zA-Z0-9_-*@.", - exception.getMessage()); - verify(dataSourceFactory, times(1)).getDataSourceType(); - verify(dataSourceFactory, times(0)).createDataSource(dataSourceMetadata); - verifyNoInteractions(dataSourceMetadataStorage); - } - - @Test - void testCreateDataSourceWithEmptyDatasourceName() { - DataSourceMetadata dataSourceMetadata = - metadata("", DataSourceType.OPENSEARCH, Collections.emptyList(), ImmutableMap.of()); - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> dataSourceService.createDataSource(dataSourceMetadata)); - assertEquals( - "Missing Name Field from a DataSource. Name is a required parameter.", - exception.getMessage()); - verify(dataSourceFactory, times(1)).getDataSourceType(); - verify(dataSourceFactory, times(0)).createDataSource(dataSourceMetadata); - verifyNoInteractions(dataSourceMetadataStorage); - } - - @Test - void testCreateDataSourceWithNullParameters() { - DataSourceMetadata dataSourceMetadata = - metadata("testDS", DataSourceType.OPENSEARCH, Collections.emptyList(), null); - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> dataSourceService.createDataSource(dataSourceMetadata)); - assertEquals( - "Missing properties field in datasource configuration. " - + "Properties are required parameters.", - exception.getMessage()); - verify(dataSourceFactory, times(1)).getDataSourceType(); - verify(dataSourceFactory, times(0)).createDataSource(dataSourceMetadata); - verifyNoInteractions(dataSourceMetadataStorage); - } - @Test void testGetDataSourceMetadataSet() { HashMap properties = new HashMap<>(); @@ -318,9 +268,11 @@ void testPatchDataSourceSuccessCase() { ALLOWED_ROLES_FIELD, new ArrayList<>(), PROPERTIES_FIELD, - Map.of(), + Map.of("prometehus.uri", "random"), RESULT_INDEX_FIELD, - "")); + "query_execution_result_testds", + STATUS_FIELD, + DataSourceStatus.DISABLED)); DataSourceMetadata getData = metadata("testDS", DataSourceType.OPENSEARCH, Collections.emptyList(), ImmutableMap.of()); when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) @@ -365,12 +317,12 @@ void testDataSourceExistsForDefaultDataSource() { DataSourceMetadata metadata( String name, DataSourceType type, List allowedRoles, Map properties) { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName(name); - dataSourceMetadata.setConnector(type); - dataSourceMetadata.setAllowedRoles(allowedRoles); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName(name) + .setConnector(type) + .setAllowedRoles(allowedRoles) + .setProperties(properties) + .build(); } @Test @@ -381,13 +333,12 @@ void testRemovalOfAuthorizationInfo() { properties.put("prometheus.auth.username", "username"); properties.put("prometheus.auth.password", "password"); DataSourceMetadata dataSourceMetadata = - new DataSourceMetadata( - "testDS", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - Collections.singletonList("prometheus_access"), - properties, - null); + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .build(); when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) .thenReturn(Optional.of(dataSourceMetadata)); @@ -407,13 +358,12 @@ void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() { properties.put("prometheus.auth.access_key", "access_key"); properties.put("prometheus.auth.secret_key", "secret_key"); DataSourceMetadata dataSourceMetadata = - new DataSourceMetadata( - "testDS", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - Collections.singletonList("prometheus_access"), - properties, - null); + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .build(); when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) .thenReturn(Optional.of(dataSourceMetadata)); @@ -435,13 +385,12 @@ void testRemovalOfAuthorizationInfoForGlueWithRoleARN() { properties.put("glue.indexstore.opensearch.auth.username", "username"); properties.put("glue.indexstore.opensearch.auth.password", "password"); DataSourceMetadata dataSourceMetadata = - new DataSourceMetadata( - "testGlue", - StringUtils.EMPTY, - DataSourceType.S3GLUE, - Collections.singletonList("glue_access"), - properties, - null); + new DataSourceMetadata.Builder() + .setName("testGlue") + .setProperties(properties) + .setConnector(DataSourceType.S3GLUE) + .setAllowedRoles(Collections.singletonList("glue_access")) + .build(); when(dataSourceMetadataStorage.getDataSourceMetadata("testGlue")) .thenReturn(Optional.of(dataSourceMetadata)); @@ -493,26 +442,50 @@ void testGetDataSourceMetadataForSpecificDataSourceName() { } @Test - void testGetRawDataSourceMetadata() { + void testVerifyDataSourceAccessAndGetRawDataSourceMetadataWithDisabledData() { HashMap properties = new HashMap<>(); properties.put("prometheus.uri", "https://localhost:9090"); properties.put("prometheus.auth.type", "basicauth"); properties.put("prometheus.auth.username", "username"); properties.put("prometheus.auth.password", "password"); DataSourceMetadata dataSourceMetadata = - new DataSourceMetadata( - "testDS", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - Collections.singletonList("prometheus_access"), - properties, - null); + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .setDataSourceStatus(DataSourceStatus.DISABLED) + .build(); when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) .thenReturn(Optional.of(dataSourceMetadata)); + DatasourceDisabledException datasourceDisabledException = + Assertions.assertThrows( + DatasourceDisabledException.class, + () -> dataSourceService.verifyDataSourceAccessAndGetRawMetadata("testDS")); + Assertions.assertEquals( + "Datasource testDS is disabled.", datasourceDisabledException.getMessage()); + } - DataSourceMetadata dataSourceMetadata1 = dataSourceService.getRawDataSourceMetadata("testDS"); - assertEquals("testDS", dataSourceMetadata1.getName()); - assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector()); + @Test + void testVerifyDataSourceAccessAndGetRawDataSourceMetadata() { + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", "https://localhost:9090"); + properties.put("prometheus.auth.type", "basicauth"); + properties.put("prometheus.auth.username", "username"); + properties.put("prometheus.auth.password", "password"); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .setDataSourceStatus(DataSourceStatus.ACTIVE) + .build(); + when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) + .thenReturn(Optional.of(dataSourceMetadata)); + DataSourceMetadata dataSourceMetadata1 = + dataSourceService.verifyDataSourceAccessAndGetRawMetadata("testDS"); + assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.uri")); assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type")); assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username")); assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password")); diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java index 7d41737b2d..f9c62599ec 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java @@ -501,8 +501,8 @@ public void testUpdateDataSourceMetadataWithDocumentMissingException() { Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())) .thenThrow(new DocumentMissingException(ShardId.fromString("[2][2]"), "testDS")); - DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(); - dataSourceMetadata.setName("testDS"); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder(getDataSourceMetadata()).setName("testDS").build(); DataSourceNotFoundException dataSourceNotFoundException = Assertions.assertThrows( @@ -526,8 +526,8 @@ public void testUpdateDataSourceMetadataWithRuntimeException() { Mockito.when(encryptor.encrypt("access_key")).thenReturn("access_key"); Mockito.when(client.update(ArgumentMatchers.any())) .thenThrow(new RuntimeException("error message")); - DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(); - dataSourceMetadata.setName("testDS"); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder(getDataSourceMetadata()).setName("testDS").build(); RuntimeException runtimeException = Assertions.assertThrows( @@ -599,74 +599,82 @@ public void testDeleteDataSourceMetadataWithUnexpectedResult() { } private String getBasicDataSourceMetadataString() throws JsonProcessingException { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); Map properties = new HashMap<>(); properties.put("prometheus.auth.type", "basicauth"); properties.put("prometheus.auth.username", "username"); properties.put("prometheus.auth.uri", "https://localhost:9090"); properties.put("prometheus.auth.password", "password"); - dataSourceMetadata.setProperties(properties); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .build(); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(dataSourceMetadata); } private String getAWSSigv4DataSourceMetadataString() throws JsonProcessingException { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); Map properties = new HashMap<>(); properties.put("prometheus.auth.type", "awssigv4"); properties.put("prometheus.auth.secret_key", "secret_key"); properties.put("prometheus.auth.uri", "https://localhost:9090"); properties.put("prometheus.auth.access_key", "access_key"); - dataSourceMetadata.setProperties(properties); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .build(); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(dataSourceMetadata); } private String getDataSourceMetadataStringWithBasicAuthentication() throws JsonProcessingException { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); Map properties = new HashMap<>(); properties.put("prometheus.auth.uri", "https://localhost:9090"); properties.put("prometheus.auth.type", "basicauth"); properties.put("prometheus.auth.username", "username"); properties.put("prometheus.auth.password", "password"); - dataSourceMetadata.setProperties(properties); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .build(); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(dataSourceMetadata); } private String getDataSourceMetadataStringWithNoAuthentication() throws JsonProcessingException { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); Map properties = new HashMap<>(); properties.put("prometheus.auth.uri", "https://localhost:9090"); - dataSourceMetadata.setProperties(properties); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .build(); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(dataSourceMetadata); } private DataSourceMetadata getDataSourceMetadata() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); Map properties = new HashMap<>(); properties.put("prometheus.auth.type", "awssigv4"); properties.put("prometheus.auth.secret_key", "secret_key"); properties.put("prometheus.auth.uri", "https://localhost:9090"); properties.put("prometheus.auth.access_key", "access_key"); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("testDS") + .setProperties(properties) + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(Collections.singletonList("prometheus_access")) + .build(); } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java index 9088d3c4ad..ba93890883 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportCreateDataSourceActionTest.java @@ -71,9 +71,11 @@ public void setUp() { @Test public void testDoExecute() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test_datasource"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("test_datasource") + .setConnector(DataSourceType.PROMETHEUS) + .build(); CreateDataSourceActionRequest request = new CreateDataSourceActionRequest(dataSourceMetadata); action.doExecute(task, request, actionListener); @@ -88,9 +90,11 @@ public void testDoExecute() { @Test public void testDoExecuteWithException() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test_datasource"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("test_datasource") + .setConnector(DataSourceType.PROMETHEUS) + .build(); doThrow(new RuntimeException("Error")) .when(dataSourceService) .createDataSource(dataSourceMetadata); @@ -105,9 +109,11 @@ public void testDoExecuteWithException() { @Test public void testDataSourcesLimit() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test_datasource"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("test_datasource") + .setConnector(DataSourceType.PROMETHEUS) + .build(); CreateDataSourceActionRequest request = new CreateDataSourceActionRequest(dataSourceMetadata); when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1); when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(1); diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java index 286f308402..90bd7bb025 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportGetDataSourceActionTest.java @@ -68,9 +68,11 @@ public void setUp() { @Test public void testDoExecute() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test_datasource"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("test_datasource") + .setConnector(DataSourceType.PROMETHEUS) + .build(); GetDataSourceActionRequest request = new GetDataSourceActionRequest("test_datasource"); when(dataSourceService.getDataSourceMetadata("test_datasource")).thenReturn(dataSourceMetadata); @@ -97,10 +99,11 @@ protected Object buildJsonObject(DataSourceMetadata response) { @Test public void testDoExecuteForGetAllDataSources() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test_datasource"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("test_datasource") + .setConnector(DataSourceType.PROMETHEUS) + .build(); GetDataSourceActionRequest request = new GetDataSourceActionRequest(); when(dataSourceService.getDataSourceMetadata(false)) .thenReturn(Collections.singleton(dataSourceMetadata)); diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java index 4d42cdb2fa..e086813938 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/transport/TransportUpdateDataSourceActionTest.java @@ -62,9 +62,11 @@ public void setUp() { @Test public void testDoExecute() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test_datasource"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("test_datasource") + .setConnector(DataSourceType.PROMETHEUS) + .build(); UpdateDataSourceActionRequest request = new UpdateDataSourceActionRequest(dataSourceMetadata); action.doExecute(task, request, actionListener); @@ -80,9 +82,12 @@ public void testDoExecute() { @Test public void testDoExecuteWithException() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("test_datasource"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("test_datasource") + .setConnector(DataSourceType.PROMETHEUS) + .build(); + doThrow(new RuntimeException("Error")) .when(dataSourceService) .updateDataSource(dataSourceMetadata); diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java index 5a1f5e155f..c6f08b673b 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java @@ -1,6 +1,7 @@ package org.opensearch.sql.datasources.utils; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.sql.datasource.model.DataSourceStatus.ACTIVE; import static org.opensearch.sql.datasources.utils.XContentParserUtils.*; import com.google.gson.Gson; @@ -23,28 +24,32 @@ public class XContentParserUtilsTest { @SneakyThrows @Test public void testConvertToXContent() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); - dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090")); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("testDS") + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(List.of("prometheus_access")) + .setProperties(Map.of("prometheus.uri", "https://localhost:9090")) + .build(); XContentBuilder contentBuilder = XContentParserUtils.convertToXContent(dataSourceMetadata); String contentString = BytesReference.bytes(contentBuilder).utf8ToString(); Assertions.assertEquals( - "{\"name\":\"testDS\",\"description\":\"\",\"connector\":\"PROMETHEUS\",\"allowedRoles\":[\"prometheus_access\"],\"properties\":{\"prometheus.uri\":\"https://localhost:9090\"},\"resultIndex\":null}", + "{\"name\":\"testDS\",\"description\":\"\",\"connector\":\"PROMETHEUS\",\"allowedRoles\":[\"prometheus_access\"],\"properties\":{\"prometheus.uri\":\"https://localhost:9090\"},\"resultIndex\":\"query_execution_result_testds\",\"status\":\"ACTIVE\"}", contentString); } @SneakyThrows @Test public void testToDataSourceMetadataFromJson() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("testDS"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); - dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090")); - dataSourceMetadata.setResultIndex("query_execution_result2"); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata.Builder() + .setName("testDS") + .setConnector(DataSourceType.PROMETHEUS) + .setAllowedRoles(List.of("prometheus_access")) + .setProperties(Map.of("prometheus.uri", "https://localhost:9090")) + .setResultIndex("query_execution_result2") + .build(); Gson gson = new Gson(); String json = gson.toJson(dataSourceMetadata); @@ -70,7 +75,9 @@ public void testToMapFromJson() { CONNECTOR_FIELD, "PROMETHEUS", RESULT_INDEX_FIELD, - ""); + "", + STATUS_FIELD, + ACTIVE); Map dataSourceDataConnectorRemoved = Map.of( @@ -83,7 +90,9 @@ public void testToMapFromJson() { PROPERTIES_FIELD, Map.of("prometheus.uri", "localhost:9090"), RESULT_INDEX_FIELD, - ""); + "", + STATUS_FIELD, + ACTIVE); Gson gson = new Gson(); String json = gson.toJson(dataSourceData); @@ -96,21 +105,17 @@ public void testToMapFromJson() { @SneakyThrows @Test - public void testToDataSourceMetadataFromJsonWithoutName() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); - dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090")); - Gson gson = new Gson(); - String json = gson.toJson(dataSourceMetadata); - + public void testToDataSourceMetadataFromJsonWithoutNameAndConnector() { IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, () -> { - XContentParserUtils.toDataSourceMetadata(json); + XContentParserUtils.toDataSourceMetadata( + "{\"description\":\"\",\"allowedRoles\":[\"prometheus_access\"],\"resultIndex\":\"query_execution_result_testds\",\"status\":\"ACTIVE\"}"); }); - Assertions.assertEquals("name and connector are required fields.", exception.getMessage()); + Assertions.assertEquals( + "Datasource configuration error: name, connector cannot be null or empty.", + exception.getMessage()); } @SneakyThrows @@ -129,25 +134,6 @@ public void testToMapFromJsonWithoutName() { Assertions.assertEquals("Name is a required field.", exception.getMessage()); } - @SneakyThrows - @Test - public void testToDataSourceMetadataFromJsonWithoutConnector() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("name"); - dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); - dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090")); - Gson gson = new Gson(); - String json = gson.toJson(dataSourceMetadata); - - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> { - XContentParserUtils.toDataSourceMetadata(json); - }); - Assertions.assertEquals("name and connector are required fields.", exception.getMessage()); - } - @SneakyThrows @Test public void testToDataSourceMetadataFromJsonUsingUnknownObject() { diff --git a/docs/user/ppl/admin/datasources.rst b/docs/user/ppl/admin/datasources.rst index 31378f6cc4..0c519fb8c1 100644 --- a/docs/user/ppl/admin/datasources.rst +++ b/docs/user/ppl/admin/datasources.rst @@ -39,7 +39,8 @@ Example Prometheus Datasource Definition :: "prometheus.auth.username" : "admin", "prometheus.auth.password" : "admin" }, - "allowedRoles" : ["prometheus_access"] + "allowedRoles" : ["prometheus_access"], + "status" : "ACTIVE|DISABLED" } Datasource configuration Restrictions. @@ -51,6 +52,8 @@ Datasource configuration Restrictions. * Allowed Connectors. * ``prometheus`` [More details: `Prometheus Connector `_] * All the allowed config parameters in ``properties`` are defined in individual connector pages mentioned above. +* From version 2.13, we have introduced a new optional field ``status`` which can be used to enable and disable a datasource.When a datasource is disabled, it blocks new queries, resulting in 400 errors for any attempts made on it. By default when a datasource is created, status is ACTIVE. + Datasource configuration APIs ====================================== @@ -196,3 +199,16 @@ Moving from keystore datasource configuration ============================================= * In versions prior to 2.7, the plugins.query.federation.datasources.config key store setting was used to configure datasources, but it has been deprecated and will be removed in version 3.0. * To port previously configured datasources from the keystore, users can use the `create datasource` REST API mentioned in the above section. + +Disabling a datasource to block new queries +============================================= +* We can disable a datasource using PATCH or PUT API. Below is the example request for disabling a datasource named "my_prometheus" using PATCH API. :: + + PATCH https://localhost:9200/_plugins/_query/_datasources + content-type: application/json + Authorization: Basic {{username}} {{password}} + + { + "name" : "my_prometheus", + "status" : "disabled" + } diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index c681b58eb4..bafa14c517 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -5,11 +5,14 @@ package org.opensearch.sql.datasource; +import static org.opensearch.sql.datasource.model.DataSourceStatus.ACTIVE; +import static org.opensearch.sql.datasource.model.DataSourceStatus.DISABLED; +import static org.opensearch.sql.datasources.utils.XContentParserUtils.ALLOWED_ROLES_FIELD; import static org.opensearch.sql.datasources.utils.XContentParserUtils.DESCRIPTION_FIELD; import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD; +import static org.opensearch.sql.datasources.utils.XContentParserUtils.STATUS_FIELD; import static org.opensearch.sql.legacy.TestUtils.getResponseBody; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -21,7 +24,6 @@ import java.util.List; import java.util.Map; import lombok.SneakyThrows; -import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -35,6 +37,11 @@ public class DataSourceAPIsIT extends PPLIntegTestCase { + @Override + protected void init() throws Exception { + loadIndex(Index.DATASOURCES); + } + @After public void cleanUp() throws IOException { wipeAllClusterSettings(); @@ -68,21 +75,21 @@ protected static void deleteDataSourcesCreated() throws IOException { public void createDataSourceAPITest() { // create datasource DataSourceMetadata createDSM = - new DataSourceMetadata( - "create_prometheus", - "Prometheus Creation for Integ test", - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of( - "prometheus.uri", - "https://localhost:9090", - "prometheus.auth.type", - "basicauth", - "prometheus.auth.username", - "username", - "prometheus.auth.password", - "password"), - null); + new DataSourceMetadata.Builder() + .setName("create_prometheus") + .setDescription("Prometheus Creation for Integ test") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties( + ImmutableMap.of( + "prometheus.uri", + "https://localhost:9090", + "prometheus.auth.type", + "basicauth", + "prometheus.auth.username", + "username", + "prometheus.auth.password", + "password")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); Response response = client().performRequest(createRequest); Assert.assertEquals(201, response.getStatusLine().getStatusCode()); @@ -104,6 +111,7 @@ public void createDataSourceAPITest() { "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.username")); Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password")); + Assert.assertEquals(ACTIVE, dataSourceMetadata.getStatus()); Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription()); } @@ -112,13 +120,11 @@ public void createDataSourceAPITest() { public void updateDataSourceAPITest() { // create datasource DataSourceMetadata createDSM = - new DataSourceMetadata( - "update_prometheus", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of("prometheus.uri", "https://localhost:9090"), - null); + new DataSourceMetadata.Builder() + .setName("update_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "https://localhost:9090")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); client().performRequest(createRequest); // Datasource is not immediately created. so introducing a sleep of 2s. @@ -126,13 +132,11 @@ public void updateDataSourceAPITest() { // update datasource DataSourceMetadata updateDSM = - new DataSourceMetadata( - "update_prometheus", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of("prometheus.uri", "https://randomtest.com:9090"), - null); + new DataSourceMetadata.Builder() + .setName("update_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "https://randomtest.com:9090")) + .build(); Request updateRequest = getUpdateDataSourceRequest(updateDSM); Response updateResponse = client().performRequest(updateRequest); Assert.assertEquals(200, updateResponse.getStatusLine().getStatusCode()); @@ -186,13 +190,11 @@ public void deleteDataSourceTest() { // create datasource for deletion DataSourceMetadata createDSM = - new DataSourceMetadata( - "delete_prometheus", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of("prometheus.uri", "https://localhost:9090"), - null); + new DataSourceMetadata.Builder() + .setName("delete_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "https://localhost:9090")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); client().performRequest(createRequest); // Datasource is not immediately created. so introducing a sleep of 2s. @@ -226,13 +228,11 @@ public void deleteDataSourceTest() { public void getAllDataSourceTest() { // create datasource for deletion DataSourceMetadata createDSM = - new DataSourceMetadata( - "get_all_prometheus", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of("prometheus.uri", "https://localhost:9090"), - null); + new DataSourceMetadata.Builder() + .setName("get_all_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "https://localhost:9090")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); client().performRequest(createRequest); // Datasource is not immediately created. so introducing a sleep of 2s. @@ -255,21 +255,21 @@ public void getAllDataSourceTest() { public void issue2196() { // create datasource DataSourceMetadata createDSM = - new DataSourceMetadata( - "Create_Prometheus", - "Prometheus Creation for Integ test", - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of( - "prometheus.uri", - "https://localhost:9090", - "prometheus.auth.type", - "basicauth", - "prometheus.auth.username", - "username", - "prometheus.auth.password", - "password"), - null); + new DataSourceMetadata.Builder() + .setName("Create_Prometheus") + .setDescription("Prometheus Creation for Integ test") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties( + ImmutableMap.of( + "prometheus.uri", + "https://localhost:9090", + "prometheus.auth.type", + "basicauth", + "prometheus.auth.username", + "username", + "prometheus.auth.password", + "password")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); Response response = client().performRequest(createRequest); Assert.assertEquals(201, response.getStatusLine().getStatusCode()); @@ -317,21 +317,109 @@ public void datasourceLimitTest() throws InterruptedException, IOException { errorMessage.get("error").getAsJsonObject().get("details").getAsString()); } + @SneakyThrows + @Test + public void patchDataSourceAPITest() { + // create datasource + DataSourceMetadata createDSM = + new DataSourceMetadata.Builder() + .setName("patch_prometheus") + .setDescription("Prometheus Creation for Integ test") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties( + ImmutableMap.of( + "prometheus.uri", + "https://localhost:9090", + "prometheus.auth.type", + "basicauth", + "prometheus.auth.username", + "username", + "prometheus.auth.password", + "password")) + .setAllowedRoles(List.of("role1", "role2")) + .build(); + Request createRequest = getCreateDataSourceRequest(createDSM); + Response response = client().performRequest(createRequest); + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + String createResponseString = getResponseBody(response); + Assert.assertEquals("\"Created DataSource with name patch_prometheus\"", createResponseString); + // Datasource is not immediately created. so introducing a sleep of 2s. + Thread.sleep(2000); + + // patch datasource + Map updateDS = + new HashMap<>( + Map.of( + NAME_FIELD, + "patch_prometheus", + DESCRIPTION_FIELD, + "test", + STATUS_FIELD, + "disabled", + ALLOWED_ROLES_FIELD, + List.of("role3", "role4"))); + + Request patchRequest = getPatchDataSourceRequest(updateDS); + Response patchResponse = client().performRequest(patchRequest); + Assert.assertEquals(200, patchResponse.getStatusLine().getStatusCode()); + String patchResponseString = getResponseBody(patchResponse); + Assert.assertEquals("\"Updated DataSource with name patch_prometheus\"", patchResponseString); + + // Datasource is not immediately updated. so introducing a sleep of 2s. + Thread.sleep(2000); + + // get datasource to validate the creation. + Request getRequest = getFetchDataSourceRequest("patch_prometheus"); + Response getResponse = client().performRequest(getRequest); + Assert.assertEquals(200, getResponse.getStatusLine().getStatusCode()); + String getResponseString = getResponseBody(getResponse); + DataSourceMetadata dataSourceMetadata = + new Gson().fromJson(getResponseString, DataSourceMetadata.class); + Assert.assertEquals( + "https://localhost:9090", dataSourceMetadata.getProperties().get("prometheus.uri")); + Assert.assertEquals( + "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); + Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.username")); + Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password")); + Assert.assertEquals(DISABLED, dataSourceMetadata.getStatus()); + Assert.assertEquals(List.of("role3", "role4"), dataSourceMetadata.getAllowedRoles()); + Assert.assertEquals("test", dataSourceMetadata.getDescription()); + } + + @SneakyThrows + @Test + public void testOldDataSourceModelLoadingThroughGetDataSourcesAPI() { + // get datasource to validate the creation. + Request getRequest = getFetchDataSourceRequest(null); + Response getResponse = client().performRequest(getRequest); + Assert.assertEquals(200, getResponse.getStatusLine().getStatusCode()); + String getResponseString = getResponseBody(getResponse); + Type listType = new TypeToken>() {}.getType(); + List dataSourceMetadataList = + new Gson().fromJson(getResponseString, listType); + Assert.assertTrue( + dataSourceMetadataList.stream() + .anyMatch( + dataSourceMetadata -> + dataSourceMetadata.getName().equals("old_prometheus") + && dataSourceMetadata.getStatus().equals(ACTIVE))); + } + public DataSourceMetadata mockDataSourceMetadata(String name) { - return new DataSourceMetadata( - name, - "Prometheus Creation for Integ test", - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of( - "prometheus.uri", - "https://localhost:9090", - "prometheus.auth.type", - "basicauth", - "prometheus.auth.username", - "username", - "prometheus.auth.password", - "password"), - null); + return new DataSourceMetadata.Builder() + .setName(name) + .setDescription("Prometheus Creation for Integ test") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties( + ImmutableMap.of( + "prometheus.uri", + "https://localhost:9090", + "prometheus.auth.type", + "basicauth", + "prometheus.auth.username", + "username", + "prometheus.auth.password", + "password")) + .build(); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java index d916bfc4db..71222cbd6e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java @@ -12,10 +12,8 @@ import static org.opensearch.sql.util.MatcherUtils.verifyColumn; import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; -import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.junit.After; import org.junit.Assert; @@ -43,13 +41,11 @@ protected static void metricGenerationWait() throws InterruptedException { @Override protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = - new DataSourceMetadata( - "my_prometheus", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of("prometheus.uri", "http://localhost:9090"), - null); + new DataSourceMetadata.Builder() + .setName("my_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "http://localhost:9090")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); Response response = client().performRequest(createRequest); Assert.assertEquals(201, response.getStatusLine().getStatusCode()); diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java index e0b463ed36..f4ae9b5536 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java @@ -7,6 +7,7 @@ package org.opensearch.sql.ppl; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; @@ -14,7 +15,6 @@ import static org.opensearch.sql.util.MatcherUtils.schema; import static org.opensearch.sql.util.MatcherUtils.verifySchema; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; import java.io.IOException; @@ -35,8 +35,11 @@ import org.junit.jupiter.api.Test; import org.opensearch.client.Request; import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.util.TestUtils; public class PrometheusDataSourceCommandsIT extends PPLIntegTestCase { @@ -55,13 +58,11 @@ protected static void metricGenerationWait() throws InterruptedException { @Override protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = - new DataSourceMetadata( - "my_prometheus", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of("prometheus.uri", "http://localhost:9090"), - null); + new DataSourceMetadata.Builder() + .setName("my_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "http://localhost:9090")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); Response response = client().performRequest(createRequest); Assert.assertEquals(201, response.getStatusLine().getStatusCode()); @@ -284,6 +285,38 @@ public void testExplainForQueryExemplars() throws Exception { + "query_exemplars('app_ads_ad_requests_total',1689228292,1689232299)")); } + @Test + public void testQueryOnDisabledDataSource() throws IOException { + DataSourceMetadata deletedDSM = + new DataSourceMetadata.Builder() + .setName("disabled_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "http://localhost:9090")) + .setDataSourceStatus(DataSourceStatus.DISABLED) + .build(); + Request createRequest = getCreateDataSourceRequest(deletedDSM); + Response response = client().performRequest(createRequest); + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + + try { + executeQuery( + "source=disabled_prometheus.prometheus_http_requests_total | stats sum(@value) by" + + " span(@timestamp, 15s), handler, job"); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + JSONObject result = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(result.getInt("status"), equalTo(400)); + JSONObject error = result.getJSONObject("error"); + assertThat(error.getString("reason"), equalTo("Invalid Query")); + assertThat(error.getString("details"), equalTo("Datasource disabled_prometheus is disabled.")); + assertThat(error.getString("type"), equalTo("DatasourceDisabledException")); + + Request deleteRequest = getDeleteDataSourceRequest("disabled_prometheus"); + Response deleteResponse = client().performRequest(deleteRequest); + Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode()); + } + String loadFromFile(String filename) throws Exception { URI uri = Resources.getResource(filename).toURI(); return new String(Files.readAllBytes(Paths.get(uri))); diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java index b6a34d5c41..cf5df01993 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java @@ -12,10 +12,8 @@ import static org.opensearch.sql.util.MatcherUtils.verifyColumn; import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; -import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.junit.After; import org.junit.Assert; @@ -43,13 +41,11 @@ protected static void metricGenerationWait() throws InterruptedException { @Override protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = - new DataSourceMetadata( - "my_prometheus", - StringUtils.EMPTY, - DataSourceType.PROMETHEUS, - ImmutableList.of(), - ImmutableMap.of("prometheus.uri", "http://localhost:9090"), - null); + new DataSourceMetadata.Builder() + .setName("my_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "http://localhost:9090")) + .build(); Request createRequest = getCreateDataSourceRequest(createDSM); Response response = client().performRequest(createRequest); Assert.assertEquals(201, response.getStatusLine().getStatusCode()); diff --git a/integ-test/src/test/resources/datasources.json b/integ-test/src/test/resources/datasources.json index e1e5d5e8bd..77d6a26148 100644 --- a/integ-test/src/test/resources/datasources.json +++ b/integ-test/src/test/resources/datasources.json @@ -1,2 +1,2 @@ -{"index":{"_id":"my_prometheus"}} -{ "name" : "my_prometheus", "connector": "prometheus", "properties" : { "prometheus.uri" : "http://localhost:9090"}} +{"index":{"_id":"old_prometheus"}} +{ "name" : "old_prometheus", "connector": "prometheus", "properties" : { "prometheus.uri" : "http://localhost:9090"}} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java index f17a4b10d0..7b1e2dec0f 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java @@ -181,10 +181,12 @@ void createDataSourceSuccess() { properties.put("prometheus.auth.username", "admin"); properties.put("prometheus.auth.password", "admin"); - DataSourceMetadata metadata = new DataSourceMetadata(); - metadata.setName("prometheus"); - metadata.setConnector(DataSourceType.PROMETHEUS); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(properties) + .build(); DataSource dataSource = new PrometheusStorageFactory(settings).createDataSource(metadata); Assertions.assertTrue(dataSource.getStorageEngine() instanceof PrometheusStorageEngine); @@ -200,10 +202,12 @@ void createDataSourceSuccessWithLocalhost() { properties.put("prometheus.auth.username", "admin"); properties.put("prometheus.auth.password", "admin"); - DataSourceMetadata metadata = new DataSourceMetadata(); - metadata.setName("prometheus"); - metadata.setConnector(DataSourceType.PROMETHEUS); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(properties) + .build(); DataSource dataSource = new PrometheusStorageFactory(settings).createDataSource(metadata); Assertions.assertTrue(dataSource.getStorageEngine() instanceof PrometheusStorageEngine); @@ -219,10 +223,12 @@ void createDataSourceWithHostnameNotMatchingWithAllowHostsConfig() { properties.put("prometheus.auth.username", "admin"); properties.put("prometheus.auth.password", "admin"); - DataSourceMetadata metadata = new DataSourceMetadata(); - metadata.setName("prometheus"); - metadata.setConnector(DataSourceType.PROMETHEUS); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(properties) + .build(); PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(settings); RuntimeException exception = diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 5b5745d438..cd4177a0f0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -60,8 +60,8 @@ public class SparkQueryDispatcher { public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) { EMRServerlessClient emrServerlessClient = emrServerlessClientFactory.getClient(); DataSourceMetadata dataSourceMetadata = - this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); - dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); + this.dataSourceService.verifyDataSourceAccessAndGetRawMetadata( + dispatchQueryRequest.getDatasource()); AsyncQueryHandler asyncQueryHandler = sessionManager.isEnabled() ? new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager) diff --git a/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java b/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java index 00a455d943..ced5609083 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java +++ b/spark/src/main/java/org/opensearch/sql/spark/rest/RestAsyncQueryManagementAction.java @@ -26,7 +26,7 @@ import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; -import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; +import org.opensearch.sql.datasources.exceptions.DataSourceClientException; import org.opensearch.sql.datasources.exceptions.ErrorMessage; import org.opensearch.sql.datasources.utils.Scheduler; import org.opensearch.sql.legacy.metrics.MetricName; @@ -235,7 +235,7 @@ private void reportError(final RestChannel channel, final Exception e, final Res private static boolean isClientError(Exception e) { return e instanceof IllegalArgumentException || e instanceof IllegalStateException - || e instanceof DataSourceNotFoundException + || e instanceof DataSourceClientException || e instanceof AsyncQueryNotFoundException || e instanceof IllegalAccessException; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 33fec89e26..6a6d5982b8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -16,21 +16,22 @@ import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import org.apache.commons.lang3.StringUtils; import org.junit.Ignore; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.core.common.Strings; import org.opensearch.index.query.QueryBuilders; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.exceptions.DatasourceDisabledException; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.session.SessionId; @@ -255,13 +256,11 @@ public void datasourceWithBasicAuth() { properties.put("glue.indexstore.opensearch.auth.password", "password"); dataSourceService.createDataSource( - new DataSourceMetadata( - "mybasicauth", - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - properties, - null)); + new DataSourceMetadata.Builder() + .setName("mybasicauth") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build()); LocalEMRSClient emrsClient = new LocalEMRSClient(); EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; AsyncQueryExecutorService asyncQueryExecutorService = @@ -514,21 +513,20 @@ public void submitQueryInInvalidSessionWillCreateNewSession() { @Test public void datasourceNameIncludeUppercase() { dataSourceService.createDataSource( - new DataSourceMetadata( - "TESTS3", - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://localhost:9200", - "glue.indexstore.opensearch.auth", - "noauth"), - null)); + new DataSourceMetadata.Builder() + .setName("TESTS3") + .setConnector(DataSourceType.S3GLUE) + .setProperties( + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://localhost:9200", + "glue.indexstore.opensearch.auth", + "noauth")) + .build()); LocalEMRSClient emrsClient = new LocalEMRSClient(); EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; @@ -575,4 +573,27 @@ public void concurrentSessionLimitIsDomainLevel() { new CreateAsyncQueryRequest("select 1", DSOTHER, LangType.SQL, null))); assertEquals("domain concurrent active session can not exceed 1", exception.getMessage()); } + + @Test + public void testDatasourceDisabled() { + LocalEMRSClient emrsClient = new LocalEMRSClient(); + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // Disable Datasource + HashMap datasourceMap = new HashMap<>(); + datasourceMap.put("name", DATASOURCE); + datasourceMap.put("status", DataSourceStatus.DISABLED); + this.dataSourceService.patchDataSource(datasourceMap); + + // 1. create async query. + try { + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + fail("It should have thrown DataSourceDisabledException"); + } catch (DatasourceDisabledException exception) { + Assertions.assertEquals("Datasource mys3 is disabled.", exception.getMessage()); + } + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index c9b4b6fc88..e176a2b828 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -17,7 +17,6 @@ import com.amazonaws.services.emrserverless.model.JobRun; import com.amazonaws.services.emrserverless.model.JobRunState; import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; @@ -30,7 +29,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; -import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.opensearch.action.admin.indices.create.CreateIndexRequest; @@ -119,38 +117,36 @@ public void setup() { .get(); dataSourceService = createDataSourceService(); DataSourceMetadata dm = - new DataSourceMetadata( - DATASOURCE, - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://localhost:9200", - "glue.indexstore.opensearch.auth", - "noauth"), - null); + new DataSourceMetadata.Builder() + .setName(DATASOURCE) + .setConnector(DataSourceType.S3GLUE) + .setProperties( + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://localhost:9200", + "glue.indexstore.opensearch.auth", + "noauth")) + .build(); dataSourceService.createDataSource(dm); DataSourceMetadata otherDm = - new DataSourceMetadata( - DSOTHER, - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - ImmutableMap.of( - "glue.auth.type", - "iam_role", - "glue.auth.role_arn", - "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", - "glue.indexstore.opensearch.uri", - "http://localhost:9200", - "glue.indexstore.opensearch.auth", - "noauth"), - null); + new DataSourceMetadata.Builder() + .setName(DSOTHER) + .setConnector(DataSourceType.S3GLUE) + .setProperties( + ImmutableMap.of( + "glue.auth.type", + "iam_role", + "glue.auth.role_arn", + "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole", + "glue.indexstore.opensearch.uri", + "http://localhost:9200", + "glue.indexstore.opensearch.auth", + "noauth")) + .build(); dataSourceService.createDataSource(otherDm); stateStore = new StateStore(client, clusterService); createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 867e1c94c4..a60ae18ded 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -150,11 +150,12 @@ void testDispatchSelectQuery() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -195,11 +196,11 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithBasicAuth(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -238,11 +239,12 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithNoAuth(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -269,8 +271,9 @@ void testDispatchSelectQueryCreateNewSession() { doReturn(new StatementId(MOCK_STATEMENT_ID)).when(session).submit(any()); when(session.getSessionModel().getJobId()).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch(queryRequest); verifyNoInteractions(emrServerlessClient); @@ -293,8 +296,9 @@ void testDispatchSelectQueryReuseSession() { when(session.getSessionModel().getJobId()).thenReturn(EMR_JOB_ID); when(session.isOperationalForDataSource(any())).thenReturn(true); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch(queryRequest); verifyNoInteractions(emrServerlessClient); @@ -311,8 +315,9 @@ void testDispatchSelectQueryFailedCreateSession() { doReturn(true).when(sessionManager).isEnabled(); doThrow(RuntimeException.class).when(sessionManager).createSession(any()); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + Assertions.assertThrows( RuntimeException.class, () -> sparkQueryDispatcher.dispatch(queryRequest)); @@ -347,11 +352,13 @@ void testDispatchIndexQuery() { sparkSubmitParameters, tags, true, - null); + "query_execution_result_my_glue"); + when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -391,11 +398,11 @@ void testDispatchWithPPLQuery() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -435,11 +442,12 @@ void testDispatchQueryWithoutATableAndDataSourceName() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -483,11 +491,12 @@ void testDispatchIndexQueryWithoutADatasourceName() { sparkSubmitParameters, tags, true, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -531,11 +540,12 @@ void testDispatchMaterializedViewQuery() { sparkSubmitParameters, tags, true, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -575,11 +585,12 @@ void testDispatchShowMVQuery() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -619,11 +630,12 @@ void testRefreshIndexQuery() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -663,11 +675,12 @@ void testDispatchDescribeIndexQuery() { sparkSubmitParameters, tags, false, - null); + "query_execution_result_my_glue"); when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -685,7 +698,7 @@ void testDispatchDescribeIndexQuery() { @Test void testDispatchWithWrongURI() { - when(dataSourceService.getRawDataSourceMetadata("my_glue")) + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) .thenReturn(constructMyGlueDataSourceMetadataWithBadURISyntax()); String query = "select * from my_glue.default.http_logs"; IllegalArgumentException illegalArgumentException = @@ -707,7 +720,7 @@ void testDispatchWithWrongURI() { @Test void testDispatchWithUnSupportedDataSourceType() { - when(dataSourceService.getRawDataSourceMetadata("my_prometheus")) + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_prometheus")) .thenReturn(constructPrometheusDataSourceType()); String query = "select * from my_prometheus.default.http_logs"; UnsupportedOperationException unsupportedOperationException = @@ -894,8 +907,8 @@ void testGetQueryResponseWithSuccess() { @Test void testDispatchQueryWithExtraSparkSubmitParameters() { DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) + .thenReturn(dataSourceMetadata); String extraParameters = "--conf spark.dynamicAllocation.enabled=false"; DispatchQueryRequest[] requests = { @@ -973,9 +986,7 @@ private String withStructuredStreaming(String parameters) { } private DataSourceMetadata constructMyGlueDataSourceMetadata() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("my_glue"); - dataSourceMetadata.setConnector(DataSourceType.S3GLUE); + Map properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put( @@ -985,14 +996,14 @@ private DataSourceMetadata constructMyGlueDataSourceMetadata() { "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); properties.put("glue.indexstore.opensearch.auth", "awssigv4"); properties.put("glue.indexstore.opensearch.region", "eu-west-1"); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); } private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("my_glue"); - dataSourceMetadata.setConnector(DataSourceType.S3GLUE); Map properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put( @@ -1003,14 +1014,14 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() { properties.put("glue.indexstore.opensearch.auth", "basicauth"); properties.put("glue.indexstore.opensearch.auth.username", "username"); properties.put("glue.indexstore.opensearch.auth.password", "password"); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); } private DataSourceMetadata constructMyGlueDataSourceMetadataWithNoAuth() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("my_glue"); - dataSourceMetadata.setConnector(DataSourceType.S3GLUE); Map properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put( @@ -1019,14 +1030,14 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithNoAuth() { "glue.indexstore.opensearch.uri", "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); properties.put("glue.indexstore.opensearch.auth", "noauth"); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); } private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("my_glue"); - dataSourceMetadata.setConnector(DataSourceType.S3GLUE); Map properties = new HashMap<>(); properties.put("glue.auth.type", "iam_role"); properties.put( @@ -1034,17 +1045,18 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() { properties.put("glue.indexstore.opensearch.uri", "http://localhost:9090? param"); properties.put("glue.indexstore.opensearch.auth", "awssigv4"); properties.put("glue.indexstore.opensearch.region", "eu-west-1"); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); } private DataSourceMetadata constructPrometheusDataSourceType() { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName("my_prometheus"); - dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); - Map properties = new HashMap<>(); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; + return new DataSourceMetadata.Builder() + .setName("my_prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .build(); } private DispatchQueryRequest constructDispatchQueryRequest( diff --git a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java index eb93cdabfe..ebe3c8f3a9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/storage/SparkStorageFactoryTest.java @@ -146,10 +146,12 @@ void testCreateDataSourceSuccess() { properties.put("spark.datasource.flint.auth", "false"); properties.put("spark.datasource.flint.region", "us-west-2"); - DataSourceMetadata metadata = new DataSourceMetadata(); - metadata.setName("spark"); - metadata.setConnector(DataSourceType.SPARK); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("spark") + .setConnector(DataSourceType.SPARK) + .setProperties(properties) + .build(); DataSource dataSource = new SparkStorageFactory(client, settings).createDataSource(metadata); Assertions.assertTrue(dataSource.getStorageEngine() instanceof SparkStorageEngine); @@ -167,10 +169,12 @@ void testSetSparkJars() { properties.put("emr.auth.region", "region"); properties.put("spark.datasource.flint.integration", "s3://spark/flint-spark-integration.jar"); - DataSourceMetadata metadata = new DataSourceMetadata(); - metadata.setName("spark"); - metadata.setConnector(DataSourceType.SPARK); - metadata.setProperties(properties); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("spark") + .setConnector(DataSourceType.SPARK) + .setProperties(properties) + .build(); DataSource dataSource = new SparkStorageFactory(client, settings).createDataSource(metadata); Assertions.assertTrue(dataSource.getStorageEngine() instanceof SparkStorageEngine);