From 447c09e69debc6d3cba3655994d3829225b1679e Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 9 Jul 2024 12:35:39 -0700 Subject: [PATCH] add more unit tests Signed-off-by: Chenyang Ji --- .../insights/QueryInsightsTestUtils.java | 226 ++++++++++++++++++ .../listener/QueryInsightsListenerTests.java | 222 +++++++++++++++++ 2 files changed, 448 insertions(+) create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java new file mode 100644 index 0000000000000..54cafdc97ea74 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -0,0 +1,226 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.util.Maps; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.VersionUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress; +import static org.opensearch.test.OpenSearchTestCase.random; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; +import static org.opensearch.test.OpenSearchTestCase.randomArray; +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; +import static org.opensearch.test.OpenSearchTestCase.randomLong; +import static org.opensearch.test.OpenSearchTestCase.randomLongBetween; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +final public class QueryInsightsTestUtils { + + public QueryInsightsTestUtils() {} + + public static List generateQueryInsightRecords(int count) { + return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); + } + + /** + * Creates a List of random Query Insight Records for testing purpose + */ + public static List generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) { + List records = new ArrayList<>(); + int countOfRecords = randomIntBetween(lower, upper); + long timestamp = startTimeStamp; + for (int i = 0; i < countOfRecords; ++i) { + Map measurements = Map.of( + MetricType.LATENCY, + randomLongBetween(1000, 10000), + MetricType.CPU, + randomLongBetween(1000, 10000), + MetricType.MEMORY, + randomLongBetween(1000, 10000) + ); + + Map phaseLatencyMap = new HashMap<>(); + int countOfPhases = randomIntBetween(2, 5); + for (int j = 0; j < countOfPhases; ++j) { + phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, "{\"size\":20}"); + attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); + attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); + attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + attributes.put( + Attribute.TASK_RESOURCE_USAGES, + List.of( + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ), + new TaskResourceInfo( + randomAlphaOfLengthBetween(5, 10), + randomLongBetween(1, 1000), + randomLongBetween(1, 1000), + randomAlphaOfLengthBetween(5, 10), + new TaskResourceUsage(randomLongBetween(1, 1000), randomLongBetween(1, 1000)) + ) + ) + ); + + records.add(new SearchQueryRecord(timestamp, measurements, attributes)); + timestamp += interval; + } + return records; + } + + public static TopQueries createRandomTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = generateQueryInsightRecords(10); + + return new TopQueries(node, records); + } + + public static TopQueries createFixedTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = new ArrayList<>(); + records.add(createFixedSearchQueryRecord()); + + return new TopQueries(node, records); + } + + public static SearchQueryRecord createFixedSearchQueryRecord() { + long timestamp = 1706574180000L; + Map measurements = Map.of(MetricType.LATENCY, 1L); + + Map phaseLatencyMap = new HashMap<>(); + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + + return new SearchQueryRecord(timestamp, measurements, attributes); + } + + public static void compareJson(ToXContent param1, ToXContent param2) throws IOException { + if (param1 == null || param2 == null) { + assertNull(param1); + assertNull(param2); + return; + } + + ToXContent.Params params = ToXContent.EMPTY_PARAMS; + XContentBuilder param1Builder = jsonBuilder(); + param1.toXContent(param1Builder, params); + + XContentBuilder param2Builder = jsonBuilder(); + param2.toXContent(param2Builder, params); + + assertEquals(param1Builder.toString(), param2Builder.toString()); + } + + @SuppressWarnings("unchecked") + public static boolean checkRecordsEquals(List records1, List records2) { + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!records1.get(i).equals(records2.get(i))) { + return false; + } + Map attributes1 = records1.get(i).getAttributes(); + Map attributes2 = records2.get(i).getAttributes(); + for (Map.Entry entry : attributes1.entrySet()) { + Attribute attribute = entry.getKey(); + Object value = entry.getValue(); + if (!attributes2.containsKey(attribute)) { + return false; + } + if (value instanceof Object[] && !Arrays.deepEquals((Object[]) value, (Object[]) attributes2.get(attribute))) { + return false; + } else if (value instanceof Map + && !Maps.deepEquals((Map) value, (Map) attributes2.get(attribute))) { + return false; + } + } + } + return true; + } + + public static boolean checkRecordsEqualsWithoutOrder( + List records1, + List records2, + MetricType metricType + ) { + Set set2 = new TreeSet<>((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + set2.addAll(records2); + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!set2.contains(records1.get(i))) { + return false; + } + } + return true; + } + + public static void registerAllQueryInsightsSettings(ClusterSettings clusterSettings) { + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java new file mode 100644 index 0000000000000..051a7105a0dc0 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -0,0 +1,222 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchTask; +import org.opensearch.action.search.SearchType; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.TopQueriesService; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; + +import org.mockito.ArgumentCaptor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit Tests for {@link QueryInsightsListener}. + */ +public class QueryInsightsListenerTests extends OpenSearchTestCase { + private final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + private final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + private final SearchRequest searchRequest = mock(SearchRequest.class); + private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); + private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private final TaskResourceTrackingService taskResourceTrackingService = mock(TaskResourceTrackingService.class); + private final ThreadPool threadPool = new TestThreadPool("QueryInsightsThreadPool"); + private ClusterService clusterService; + + @Before + public void setup() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); + ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary("test", true, 1 + randomInt(3), randomInt(2)); + clusterService = ClusterServiceUtils.createClusterService(threadPool, state.getNodes().getLocalNode(), clusterSettings); + ClusterServiceUtils.setState(clusterService, state); + clusterService.setTaskResourceTrackingService(taskResourceTrackingService); + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); + + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + threadPool.getThreadContext().setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>())); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + IOUtils.close(clusterService); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + public void testOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + SearchTask task = new SearchTask( + 0, + "n/a", + "n/a", + () -> "test", + TaskId.EMPTY_TASK_ID, + Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") + ); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + when(searchPhaseContext.getTask()).thenReturn(task); + ArgumentCaptor captor = ArgumentCaptor.forClass(SearchQueryRecord.class); + + queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); + + verify(queryInsightsService, times(1)).addRecord(captor.capture()); + SearchQueryRecord generatedRecord = captor.getValue(); + assertEquals(timestamp.longValue(), generatedRecord.getTimestamp()); + assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS)); + assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE)); + assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE)); + Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS); + assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID)); + verify(taskResourceTrackingService, times(1)).refreshResourceStats(task); + } + + public void testConcurrentOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + SearchTask task = new SearchTask( + 0, + "n/a", + "n/a", + () -> "test", + TaskId.EMPTY_TASK_ID, + Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel") + ); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + final List searchListenersList = new ArrayList<>(); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + when(searchPhaseContext.getTask()).thenReturn(task); + + int numRequests = 50; + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService)); + } + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + QueryInsightsListener thisListener = searchListenersList.get(finalI); + thisListener.onRequestEnd(searchPhaseContext, searchRequestContext); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(queryInsightsService, times(numRequests)).addRecord(any()); + verify(taskResourceTrackingService, times(numRequests)).refreshResourceStats(task); + } + + public void testSetEnabled() { + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); + assertTrue(queryInsightsListener.isEnabled()); + + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.MEMORY)).thenReturn(false); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false); + assertFalse(queryInsightsListener.isEnabled()); + } +}