diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceMetadataStorage.java index 4d59c68fa0..c57624f521 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceMetadataStorage.java @@ -8,6 +8,7 @@ package org.opensearch.sql.datasources.service; import java.util.List; +import java.util.Map; import java.util.Optional; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -26,6 +27,8 @@ public interface DataSourceMetadataStorage { */ List getDataSourceMetadata(); + Map countDataSourcesPerConnector(); + /** * Gets {@link DataSourceMetadata} corresponding to the datasourceName from underlying storage. * diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java index 8ba618fb44..1fffebf4bb 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java @@ -17,6 +17,8 @@ import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; +import org.opensearch.sql.legacy.metrics.GaugeMetric; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.storage.DataSourceFactory; /** @@ -47,6 +49,7 @@ public DataSourceServiceImpl( this.dataSourceMetadataStorage = dataSourceMetadataStorage; this.dataSourceUserAuthorizationHelper = dataSourceUserAuthorizationHelper; this.dataSourceLoaderCache = new DataSourceLoaderCacheImpl(dataSourceFactories); + registerDataSourceMetrics(); } @Override @@ -207,4 +210,12 @@ private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) { .anyMatch(confidentialKey -> entry.getKey().endsWith(confidentialKey))); dataSourceMetadata.setProperties(safeProperties); } + + private void registerDataSourceMetrics() { + GaugeMetric> activeDataSourcesMetric = + new GaugeMetric<>( + "active_data_sources_count", + this.dataSourceMetadataStorage::countDataSourcesPerConnector); + Metrics.getInstance().registerMetric(activeDataSourcesMetric); + } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index 6659e54342..5422f5c349 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -41,6 +42,8 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.bucket.terms.Terms; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.encryptor.Encryptor; @@ -86,6 +89,54 @@ public List getDataSourceMetadata() { return searchInDataSourcesIndex(QueryBuilders.matchAllQuery()); } + /** + * Counts the number of data sources per connector type in the specified OpenSearch index. This + * method queries the OpenSearch index defined by {@code DATASOURCE_INDEX_NAME} and aggregates the + * count of documents (data sources) based on the connector type, which is expected to be + * specified in the document field {@code connector.keyword}. + * + *

Note: This method uses the client's thread pool to execute the search request and + * temporarily stashes the current thread context to avoid context leakage. + * + * @return A {@link Map} where keys are connector types (as {@link String}) and values are counts + * (as {@link Long}) of data sources of each type. Returns an empty map if the index does not + * exist or if no data sources are found. + * @throws RuntimeException if the search operation fails or returns a non-200 status code, + * indicating an error with the Elasticsearch query or connection. + */ + @Override + public Map countDataSourcesPerConnector() { + if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { + createDataSourcesIndex(); + return Collections.emptyMap(); + } + + SearchRequest searchRequest = new SearchRequest(DATASOURCE_INDEX_NAME); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation( + AggregationBuilders.terms("data_counts_by_connector_type").field("connector.keyword")); + searchRequest.source(searchSourceBuilder); + searchRequest.preference("_primary_first"); + ActionFuture searchResponseActionFuture; + try (ThreadContext.StoredContext ignored = + client.threadPool().getThreadContext().stashContext()) { + searchResponseActionFuture = client.search(searchRequest); + } + SearchResponse searchResponse = searchResponseActionFuture.actionGet(); + if (searchResponse.status().getStatus() != 200) { + throw new RuntimeException( + "Fetching dataSource metadata information failed with status : " + + searchResponse.status()); + } else { + Terms connectorsAgg = searchResponse.getAggregations().get("data_counts_by_connector_type"); + Map connectorCounts = new HashMap<>(); + for (Terms.Bucket bucket : connectorsAgg.getBuckets()) { + connectorCounts.put(bucket.getKeyAsString(), bucket.getDocCount()); + } + return connectorCounts; + } + } + @Override public Optional getDataSourceMetadata(String datasourceName) { if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java index 7d41737b2d..71483d7eea 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorageTest.java @@ -5,10 +5,12 @@ package org.opensearch.sql.datasources.storage; +import static org.mockito.ArgumentMatchers.anyString; import static org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage.DATASOURCE_INDEX_NAME; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -40,6 +42,8 @@ import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.terms.Terms; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.encryptor.Encryptor; @@ -247,6 +251,83 @@ public void testGetDataSourceMetadataListWithNoIndex() { Assertions.assertEquals(0, dataSourceMetadataList.size()); } + @SneakyThrows + @Test + public void testCountDataSourcesPerConnector() { + Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(true); + Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); + Mockito.when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); + Mockito.when(searchResponse.status()).thenReturn(RestStatus.OK); + // Mocking the aggregation response + Terms.Bucket bucketA = Mockito.mock(Terms.Bucket.class); + Mockito.when(bucketA.getKeyAsString()).thenReturn("DataSourceTypeA"); + Mockito.when(bucketA.getDocCount()).thenReturn(1L); + + Terms.Bucket bucketB = Mockito.mock(Terms.Bucket.class); + Mockito.when(bucketB.getKeyAsString()).thenReturn("DataSourceTypeB"); + Mockito.when(bucketB.getDocCount()).thenReturn(2L); + + // Use raw type here and cast appropriately + Terms terms = Mockito.mock(Terms.class); + List rawBucketsList = Arrays.asList(bucketA, bucketB); // Raw type + Mockito.when(terms.getBuckets()).thenReturn((List) rawBucketsList); // Casting to raw type + + Aggregations aggregations = Mockito.mock(Aggregations.class); + Mockito.when(aggregations.get(anyString())).thenReturn(terms); + + Mockito.when(searchResponse.getAggregations()).thenReturn(aggregations); + + Map connectorCounts = + openSearchDataSourceMetadataStorage.countDataSourcesPerConnector(); + + Assertions.assertEquals( + 2, connectorCounts.size(), "The size of connectorCounts map should be 2."); + Assertions.assertEquals( + Long.valueOf(1), + connectorCounts.get("DataSourceTypeA"), + "DataSourceTypeA should have a count of 1."); + Assertions.assertEquals( + Long.valueOf(2), + connectorCounts.get("DataSourceTypeB"), + "DataSourceTypeB should have a count of 2."); + } + + @SneakyThrows + @Test + public void testCountDataSourcesPerConnectorWithNoIndex() { + Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(Boolean.FALSE); + Mockito.when(client.admin().indices().create(ArgumentMatchers.any())) + .thenReturn(createIndexResponseActionFuture); + Mockito.when(createIndexResponseActionFuture.actionGet()) + .thenReturn(new CreateIndexResponse(true, true, DATASOURCE_INDEX_NAME)); + Mockito.when(client.index(ArgumentMatchers.any())).thenReturn(indexResponseActionFuture); + + Map connectorCounts = + openSearchDataSourceMetadataStorage.countDataSourcesPerConnector(); + + Assertions.assertEquals(0, connectorCounts.size()); + } + + @SneakyThrows + @Test + public void testCountDataSourcesPerConnectorWithWith404SearchResponse() { + Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(true); + Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture); + Mockito.when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); + Mockito.when(searchResponse.status()).thenReturn(RestStatus.NOT_FOUND); + + RuntimeException runtimeException = + Assertions.assertThrows( + RuntimeException.class, + () -> openSearchDataSourceMetadataStorage.countDataSourcesPerConnector()); + Assertions.assertEquals( + "Fetching dataSource metadata information failed with status : NOT_FOUND", + runtimeException.getMessage()); + } + @SneakyThrows @Test public void testGetDataSourceMetadataWithNoIndex() { diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index f81e1b6615..791925fc64 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -251,6 +251,11 @@ public List getDataSourceMetadata() { return Collections.emptyList(); } + @Override + public Map countDataSourcesPerConnector() { + return Collections.emptyMap(); + } + @Override public Optional getDataSourceMetadata(String datasourceName) { return Optional.empty();