Skip to content

Commit

Permalink
Add data source stats by type
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Feb 2, 2024
1 parent 94bd664 commit 9848a08
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.sql.datasources.service;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -26,6 +27,8 @@ public interface DataSourceMetadataStorage {
*/
List<DataSourceMetadata> getDataSourceMetadata();

Map<String, Long> countDataSourcesPerConnector();

/**
* Gets {@link DataSourceMetadata} corresponding to the datasourceName from underlying storage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.GaugeMetric;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.storage.DataSourceFactory;

/**
Expand Down Expand Up @@ -47,6 +49,7 @@ public DataSourceServiceImpl(
this.dataSourceMetadataStorage = dataSourceMetadataStorage;
this.dataSourceUserAuthorizationHelper = dataSourceUserAuthorizationHelper;
this.dataSourceLoaderCache = new DataSourceLoaderCacheImpl(dataSourceFactories);
registerDataSourceMetrics();
}

@Override
Expand Down Expand Up @@ -207,4 +210,12 @@ private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) {
.anyMatch(confidentialKey -> entry.getKey().endsWith(confidentialKey)));
dataSourceMetadata.setProperties(safeProperties);
}

private void registerDataSourceMetrics() {
GaugeMetric<Map<String, Long>> activeDataSourcesMetric =
new GaugeMetric<>(
"active_data_sources_count",
this.dataSourceMetadataStorage::countDataSourcesPerConnector);
Metrics.getInstance().registerMetric(activeDataSourcesMetric);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -41,6 +42,9 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.bucket.terms.Terms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.encryptor.Encryptor;
Expand Down Expand Up @@ -86,6 +90,56 @@ public List<DataSourceMetadata> getDataSourceMetadata() {
return searchInDataSourcesIndex(QueryBuilders.matchAllQuery());
}

/**
* Counts the number of data sources per connector type in the specified OpenSearch index. This
* method queries the OpenSearch index defined by {@code DATASOURCE_INDEX_NAME} and aggregates the
* count of documents (data sources) based on the connector type, which is expected to be
* specified in the document field {@code connector}.
*
* <p>Note: This method uses the client's thread pool to execute the search request and
* temporarily stashes the current thread context to avoid context leakage.
*
* @return A {@link Map} where keys are connector types (as {@link String}) and values are counts
* (as {@link Long}) of data sources of each type. Returns an empty map if the index does not
* exist or if no data sources are found.
* @throws RuntimeException if the search operation fails or returns a non-200 status code,
* indicating an error with the Elasticsearch query or connection.
*/
@Override
public Map<String, Long> countDataSourcesPerConnector() {
if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) {
createDataSourcesIndex();
return Collections.emptyMap();
}

SearchRequest searchRequest = new SearchRequest(DATASOURCE_INDEX_NAME);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder aggregation =
AggregationBuilders.terms("by_connector_type").field("connector");
// Drop the search hit
searchSourceBuilder.size(0).aggregation(aggregation);
searchRequest.source(searchSourceBuilder);
searchRequest.preference("_primary_first");
ActionFuture<SearchResponse> searchResponseActionFuture;
try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
searchResponseActionFuture = client.search(searchRequest);
}
SearchResponse searchResponse = searchResponseActionFuture.actionGet();
if (searchResponse.status().getStatus() != 200) {
throw new RuntimeException(
"Fetching dataSource metadata information failed with status : "
+ searchResponse.status());
} else {
Terms connectorsAgg = searchResponse.getAggregations().get("by_connector_type");
Map<String, Long> connectorCounts = new HashMap<>();
for (Terms.Bucket bucket : connectorsAgg.getBuckets()) {
connectorCounts.put(bucket.getKeyAsString(), bucket.getDocCount());
}
return connectorCounts;
}
}

@Override
public Optional<DataSourceMetadata> getDataSourceMetadata(String datasourceName) {
if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.datasources.storage;

import static org.mockito.ArgumentMatchers.anyString;
import static org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage.DATASOURCE_INDEX_NAME;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -40,6 +42,8 @@
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.bucket.terms.Terms;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.encryptor.Encryptor;
Expand Down Expand Up @@ -247,6 +251,83 @@ public void testGetDataSourceMetadataListWithNoIndex() {
Assertions.assertEquals(0, dataSourceMetadataList.size());
}

@SneakyThrows
@Test
public void testCountDataSourcesPerConnector() {
Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME))
.thenReturn(true);
Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture);
Mockito.when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse);
Mockito.when(searchResponse.status()).thenReturn(RestStatus.OK);
// Mocking the aggregation response
Terms.Bucket bucketA = Mockito.mock(Terms.Bucket.class);
Mockito.when(bucketA.getKeyAsString()).thenReturn("DataSourceTypeA");
Mockito.when(bucketA.getDocCount()).thenReturn(1L);

Terms.Bucket bucketB = Mockito.mock(Terms.Bucket.class);
Mockito.when(bucketB.getKeyAsString()).thenReturn("DataSourceTypeB");
Mockito.when(bucketB.getDocCount()).thenReturn(2L);

// Use raw type here and cast appropriately
Terms terms = Mockito.mock(Terms.class);
List rawBucketsList = Arrays.asList(bucketA, bucketB); // Raw type
Mockito.when(terms.getBuckets()).thenReturn((List) rawBucketsList); // Casting to raw type

Aggregations aggregations = Mockito.mock(Aggregations.class);
Mockito.when(aggregations.get(anyString())).thenReturn(terms);

Mockito.when(searchResponse.getAggregations()).thenReturn(aggregations);

Map<String, Long> connectorCounts =
openSearchDataSourceMetadataStorage.countDataSourcesPerConnector();

Assertions.assertEquals(
2, connectorCounts.size(), "The size of connectorCounts map should be 2.");
Assertions.assertEquals(
Long.valueOf(1),
connectorCounts.get("DataSourceTypeA"),
"DataSourceTypeA should have a count of 1.");
Assertions.assertEquals(
Long.valueOf(2),
connectorCounts.get("DataSourceTypeB"),
"DataSourceTypeB should have a count of 2.");
}

@SneakyThrows
@Test
public void testCountDataSourcesPerConnectorWithNoIndex() {
Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME))
.thenReturn(Boolean.FALSE);
Mockito.when(client.admin().indices().create(ArgumentMatchers.any()))
.thenReturn(createIndexResponseActionFuture);
Mockito.when(createIndexResponseActionFuture.actionGet())
.thenReturn(new CreateIndexResponse(true, true, DATASOURCE_INDEX_NAME));
Mockito.when(client.index(ArgumentMatchers.any())).thenReturn(indexResponseActionFuture);

Map<String, Long> connectorCounts =
openSearchDataSourceMetadataStorage.countDataSourcesPerConnector();

Assertions.assertEquals(0, connectorCounts.size());
}

@SneakyThrows
@Test
public void testCountDataSourcesPerConnectorWithWith404SearchResponse() {
Mockito.when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME))
.thenReturn(true);
Mockito.when(client.search(ArgumentMatchers.any())).thenReturn(searchResponseActionFuture);
Mockito.when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse);
Mockito.when(searchResponse.status()).thenReturn(RestStatus.NOT_FOUND);

RuntimeException runtimeException =
Assertions.assertThrows(
RuntimeException.class,
() -> openSearchDataSourceMetadataStorage.countDataSourcesPerConnector());
Assertions.assertEquals(
"Fetching dataSource metadata information failed with status : NOT_FOUND",
runtimeException.getMessage());
}

@SneakyThrows
@Test
public void testGetDataSourceMetadataWithNoIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ public List<DataSourceMetadata> getDataSourceMetadata() {
return Collections.emptyList();
}

@Override
public Map<String, Long> countDataSourcesPerConnector() {
return Collections.emptyMap();
}

@Override
public Optional<DataSourceMetadata> getDataSourceMetadata(String datasourceName) {
return Optional.empty();
Expand Down

0 comments on commit 9848a08

Please sign in to comment.