From 554cbf7bfd83d94e1b1b69528ce0d128454754f4 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 5 Feb 2024 22:18:08 -0800 Subject: [PATCH] Top N Queries by latency implementation (#11904) * Top N Queries by latency implementation * Increase JavaDoc coverage and update PR based comments * Refactor record and service to make them generic * refactor service for improving multithreading efficiency * rebase from master to pick up query insights plugin changes --------- Signed-off-by: Chenyang Ji --- CHANGELOG.md | 1 + .../QueryInsightsPluginTransportIT.java | 274 ++++++++++++++++++ .../plugin/insights/TopQueriesRestIT.java | 107 +++++++ .../plugin/insights/QueryInsightsPlugin.java | 10 +- .../core/listener/QueryInsightsListener.java | 147 ++++++++++ .../insights/core/listener/package-info.java | 12 + .../insights/rules/action/package-info.java | 12 + .../rules/action/top_queries/TopQueries.java | 77 +++++ .../action/top_queries/TopQueriesAction.java | 32 ++ .../action/top_queries/TopQueriesRequest.java | 62 ++++ .../top_queries/TopQueriesResponse.java | 143 +++++++++ .../action/top_queries/package-info.java | 12 + .../rules/resthandler/package-info.java | 12 + .../top_queries/RestTopQueriesAction.java | 99 +++++++ .../resthandler/top_queries/package-info.java | 12 + .../rules/transport/package-info.java | 12 + .../TransportTopQueriesAction.java | 155 ++++++++++ .../transport/top_queries/package-info.java | 12 + .../insights/QueryInsightsPluginTests.java | 22 +- .../insights/QueryInsightsTestUtils.java | 34 +++ .../listener/QueryInsightsListenerTests.java | 161 ++++++++++ .../top_queries/TopQueriesRequestTests.java | 43 +++ .../top_queries/TopQueriesResponseTests.java | 71 +++++ .../action/top_queries/TopQueriesTests.java | 35 +++ .../RestTopQueriesActionTests.java | 70 +++++ .../TransportTopQueriesActionTests.java | 84 ++++++ 26 files changed, 1705 insertions(+), 6 deletions(-) create mode 100644 plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java create mode 100644 plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 5afdf6b707d1e..6b78222aa742f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -224,6 +224,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968)) - Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508)) - [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903)) +- [Query Insights] Implement Top N Queries feature to collect and gather information about high latency queries in a window ([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904)) ### Deprecated diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java new file mode 100644 index 0000000000000..04e715444f50a --- /dev/null +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -0,0 +1,274 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * Transport Action tests for Query Insights Plugin + */ + +@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST) +public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { + + private final int TOTAL_NUMBER_OF_NODES = 2; + private final int TOTAL_SEARCH_REQUESTS = 5; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(QueryInsightsPlugin.class); + } + + /** + * Test Query Insights Plugin is installed + */ + public void testQueryInsightPluginInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.insights.QueryInsightsPlugin")) + ); + } + + /** + * Test get top queries when feature disabled + */ + public void testGetTopQueriesWhenFeatureDisabled() { + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertNotEquals(0, response.failures().size()); + Assert.assertEquals( + "Cannot get top n queries for [latency] when it is not enabled.", + response.failures().get(0).getCause().getCause().getMessage() + ); + } + + /** + * Test update top query record when feature enabled + */ + public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException { + Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertNotEquals(0, response.failures().size()); + Assert.assertEquals( + "Cannot get top n queries for [latency] when it is not enabled.", + response.failures().get(0).getCause().getCause().getMessage() + ); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true").build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + TopQueriesRequest request2 = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response2 = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request2).actionGet(); + Assert.assertEquals(0, response2.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response2.getNodes().size()); + for (int i = 0; i < TOTAL_NUMBER_OF_NODES; i++) { + Assert.assertEquals(0, response2.getNodes().get(i).getTopQueriesRecord().size()); + } + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries when feature enabled + */ + public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") + .build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + // Sleep to wait for queue drained to top queries store + Thread.sleep(6000); + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries with small top n size + */ + public void testGetTopQueriesWithSmallTopN() throws InterruptedException { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") + .build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + Thread.sleep(6000); + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries with small window size + */ + public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m") + .build(); + + logger.info("--> starting nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Thread.sleep(6000); + internalCluster().stopAllNodes(); + } +} diff --git a/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java b/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java new file mode 100644 index 0000000000000..57dea6ad8d5ff --- /dev/null +++ b/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.junit.Assert; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +/** + * Rest Action tests for Query Insights + */ +public class TopQueriesRestIT extends OpenSearchRestTestCase { + + /** + * test Query Insights is installed + * @throws IOException IOException + */ + @SuppressWarnings("unchecked") + public void testQueryInsightsPluginInstalled() throws IOException { + Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json"); + Response response = client().performRequest(request); + List pluginsList = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getEntity().getContent() + ).list(); + Assert.assertTrue( + pluginsList.stream().map(o -> (Map) o).anyMatch(plugin -> plugin.get("component").equals("query-insights")) + ); + } + + /** + * test enabling top queries + * @throws IOException IOException + */ + public void testTopQueriesResponses() throws IOException { + // Enable Top N Queries feature + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity(defaultTopQueriesSettings()); + Response response = client().performRequest(request); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + + // Create documents for search + request = new Request("POST", "/my-index-0/_doc"); + request.setJsonEntity(createDocumentsBody()); + response = client().performRequest(request); + + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + + // Do Search + request = new Request("GET", "/my-index-0/_search?size=20&pretty"); + request.setJsonEntity(searchBody()); + response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + + // Get Top Queries + request = new Request("GET", "/_insights/top_queries?pretty"); + response = client().performRequest(request); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + String top_requests = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + Assert.assertTrue(top_requests.contains("top_queries")); + Assert.assertEquals(2, top_requests.split("searchType", -1).length - 1); + } + + private String defaultTopQueriesSettings() { + return "{\n" + + " \"persistent\" : {\n" + + " \"search.top_n_queries.latency.enabled\" : \"true\",\n" + + " \"search.top_n_queries.latency.window_size\" : \"600s\",\n" + + " \"search.top_n_queries.latency.top_n_size\" : 5\n" + + " }\n" + + "}"; + } + + private String createDocumentsBody() { + return "{\n" + + " \"@timestamp\": \"2099-11-15T13:12:00\",\n" + + " \"message\": \"this is document 1\",\n" + + " \"user\": {\n" + + " \"id\": \"cyji\"\n" + + " }\n" + + "}"; + } + + private String searchBody() { + return "{}"; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 0acd7e2dcac0d..4d7e0d486068a 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -25,7 +25,11 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugin.insights.core.listener.QueryInsightsListener; import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction; +import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; @@ -67,7 +71,7 @@ public Collection createComponents( ) { // create top n queries service final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); - return List.of(queryInsightsService); + return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); } @Override @@ -92,12 +96,12 @@ public List getRestHandlers( final IndexNameExpressionResolver indexNameExpressionResolver, final Supplier nodesInCluster ) { - return List.of(); + return List.of(new RestTopQueriesAction()); } @Override public List> getActions() { - return List.of(); + return List.of(new ActionPlugin.ActionHandler<>(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class)); } @Override diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java new file mode 100644 index 0000000000000..705273f52a567 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; + +/** + * The listener for query insights services. + * It forwards query-related data to the appropriate query insights stores, + * either for each request or for each phase. + * + * @opensearch.internal + */ +public final class QueryInsightsListener extends SearchRequestOperationsListener { + private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + + private static final Logger log = LogManager.getLogger(QueryInsightsListener.class); + + private final QueryInsightsService queryInsightsService; + + /** + * Constructor for QueryInsightsListener + * + * @param clusterService The Node's cluster service. + * @param queryInsightsService The topQueriesByLatencyService associated with this listener + */ + @Inject + public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { + this.queryInsightsService = queryInsightsService; + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v)); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_SIZE, + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v) + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v) + ); + this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); + } + + /** + * Enable or disable top queries insights collection for {@link MetricType} + * This function will enable or disable the corresponding listeners + * and query insights services. + * + * @param metricType {@link MetricType} + * @param enabled boolean + */ + public void setEnableTopQueries(final MetricType metricType, final boolean enabled) { + boolean isAllMetricsDisabled = !queryInsightsService.isEnabled(); + this.queryInsightsService.enableCollection(metricType, enabled); + if (!enabled) { + // disable QueryInsightsListener only if all metrics collections are disabled now. + if (!queryInsightsService.isEnabled()) { + super.setEnabled(false); + this.queryInsightsService.stop(); + } + } else { + super.setEnabled(true); + // restart QueryInsightsListener only if none of metrics collections is enabled before. + if (isAllMetricsDisabled) { + this.queryInsightsService.stop(); + this.queryInsightsService.start(); + } + } + + } + + @Override + public boolean isEnabled() { + return super.isEnabled(); + } + + @Override + public void onPhaseStart(SearchPhaseContext context) {} + + @Override + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + public void onPhaseFailure(SearchPhaseContext context) {} + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + final SearchRequest request = context.getRequest(); + try { + Map measurements = new HashMap<>(); + if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) { + measurements.put( + MetricType.LATENCY, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) + ); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS)); + attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); + attributes.put(Attribute.INDICES, request.indices()); + attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); + SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); + queryInsightsService.addRecord(record); + } catch (Exception e) { + log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java new file mode 100644 index 0000000000000..3cb9cacf7fd1c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Listeners for Query Insights + */ +package org.opensearch.plugin.insights.core.listener; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java new file mode 100644 index 0000000000000..9b6b5856f7d27 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions, Requests and Responses for Query Insights + */ +package org.opensearch.plugin.insights.rules.action; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java new file mode 100644 index 0000000000000..26cff82aae52e --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.util.List; + +/** + * Holds all top queries records by resource usage or latency on a node + * Mainly used in the top N queries node response workflow. + * + * @opensearch.internal + */ +public class TopQueries extends BaseNodeResponse implements ToXContentObject { + /** The store to keep the top queries records */ + private final List topQueriesRecords; + + /** + * Create the TopQueries Object from StreamInput + * @param in A {@link StreamInput} object. + * @throws IOException IOException + */ + public TopQueries(final StreamInput in) throws IOException { + super(in); + topQueriesRecords = in.readList(SearchQueryRecord::new); + } + + /** + * Create the TopQueries Object + * @param node A node that is part of the cluster. + * @param searchQueryRecords A list of SearchQueryRecord associated in this TopQueries. + */ + public TopQueries(final DiscoveryNode node, final List searchQueryRecords) { + super(node); + topQueriesRecords = searchQueryRecords; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + if (topQueriesRecords != null) { + for (SearchQueryRecord record : topQueriesRecords) { + record.toXContent(builder, params); + } + } + return builder; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(topQueriesRecords); + + } + + /** + * Get all top queries records + * + * @return the top queries records in this node response + */ + public List getTopQueriesRecord() { + return topQueriesRecords; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java new file mode 100644 index 0000000000000..b8ed69fa5692b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.ActionType; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesAction extends ActionType { + + /** + * The TopQueriesAction Instance. + */ + public static final TopQueriesAction INSTANCE = new TopQueriesAction(); + /** + * The name of this Action + */ + public static final String NAME = "cluster:admin/opensearch/insights/top_queries"; + + private TopQueriesAction() { + super(NAME, TopQueriesResponse::new); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java new file mode 100644 index 0000000000000..3bdff2c403161 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugin.insights.rules.model.MetricType; + +import java.io.IOException; + +/** + * A request to get cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesRequest extends BaseNodesRequest { + + final MetricType metricType; + + /** + * Constructor for TopQueriesRequest + * + * @param in A {@link StreamInput} object. + * @throws IOException if the stream cannot be deserialized. + */ + public TopQueriesRequest(final StreamInput in) throws IOException { + super(in); + this.metricType = MetricType.readFromStream(in); + } + + /** + * Get top queries from nodes based on the nodes ids specified. + * If none are passed, cluster level top queries will be returned. + * + * @param metricType {@link MetricType} + * @param nodesIds the nodeIds specified in the request + */ + public TopQueriesRequest(final MetricType metricType, final String... nodesIds) { + super(nodesIds); + this.metricType = metricType; + } + + /** + * Get the type of requested metrics + */ + public MetricType getMetricType() { + return metricType; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(metricType.toString()); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java new file mode 100644 index 0000000000000..2e66bb7f77baf --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transport response for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesResponse extends BaseNodesResponse implements ToXContentFragment { + + private static final String CLUSTER_LEVEL_RESULTS_KEY = "top_queries"; + private final MetricType metricType; + private final int top_n_size; + + /** + * Constructor for TopQueriesResponse. + * + * @param in A {@link StreamInput} object. + * @throws IOException if the stream cannot be deserialized. + */ + public TopQueriesResponse(final StreamInput in) throws IOException { + super(in); + top_n_size = in.readInt(); + metricType = in.readEnum(MetricType.class); + } + + /** + * Constructor for TopQueriesResponse + * + * @param clusterName The current cluster name + * @param nodes A list that contains top queries results from all nodes + * @param failures A list that contains FailedNodeException + * @param top_n_size The top N size to return to the user + * @param metricType the {@link MetricType} to be returned in this response + */ + public TopQueriesResponse( + final ClusterName clusterName, + final List nodes, + final List failures, + final int top_n_size, + final MetricType metricType + ) { + super(clusterName, nodes, failures); + this.top_n_size = top_n_size; + this.metricType = metricType; + } + + @Override + protected List readNodesFrom(final StreamInput in) throws IOException { + return in.readList(TopQueries::new); + } + + @Override + protected void writeNodesTo(final StreamOutput out, final List nodes) throws IOException { + out.writeList(nodes); + out.writeLong(top_n_size); + out.writeEnum(metricType); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + final List results = getNodes(); + postProcess(results); + builder.startObject(); + toClusterLevelResult(builder, params, results); + return builder.endObject(); + } + + @Override + public String toString() { + try { + final XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + this.toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return builder.toString(); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } + + /** + * Post process the top queries results to add customized attributes + * + * @param results the top queries results + */ + private void postProcess(final List results) { + for (TopQueries topQueries : results) { + final String nodeId = topQueries.getNode().getId(); + for (SearchQueryRecord record : topQueries.getTopQueriesRecord()) { + record.addAttribute(Attribute.NODE_ID, nodeId); + } + } + } + + /** + * Merge top n queries results from nodes into cluster level results in XContent format. + * + * @param builder XContent builder + * @param params serialization parameters + * @param results top queries results from all nodes + * @throws IOException if an error occurs + */ + private void toClusterLevelResult(final XContentBuilder builder, final Params params, final List results) + throws IOException { + final List all_records = results.stream() + .map(TopQueries::getTopQueriesRecord) + .flatMap(Collection::stream) + .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) + .limit(top_n_size) + .collect(Collectors.toList()); + builder.startArray(CLUSTER_LEVEL_RESULTS_KEY); + for (SearchQueryRecord record : all_records) { + record.toXContent(builder, params); + } + builder.endArray(); + } + +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java new file mode 100644 index 0000000000000..3cc7900e5ce7d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions, Requests and Responses for Top N Queries + */ +package org.opensearch.plugin.insights.rules.action.top_queries; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java new file mode 100644 index 0000000000000..3787f05f65552 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Rest Handlers for Query Insights + */ +package org.opensearch.plugin.insights.rules.resthandler; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java new file mode 100644 index 0000000000000..6aa511c626ab1 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.resthandler.top_queries; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.Strings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_BASE_URI; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action to get Top N queries by certain metric type + * + * @opensearch.api + */ +public class RestTopQueriesAction extends BaseRestHandler { + /** The metric types that are allowed in top N queries */ + static final Set ALLOWED_METRICS = MetricType.allMetricTypes().stream().map(MetricType::toString).collect(Collectors.toSet()); + + /** + * Constructor for RestTopQueriesAction + */ + public RestTopQueriesAction() {} + + @Override + public List routes() { + return List.of( + new Route(GET, TOP_QUERIES_BASE_URI), + new Route(GET, String.format(Locale.ROOT, "%s/{nodeId}", TOP_QUERIES_BASE_URI)) + ); + } + + @Override + public String getName() { + return "query_insights_top_queries_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final TopQueriesRequest topQueriesRequest = prepareRequest(request); + topQueriesRequest.timeout(request.param("timeout")); + + return channel -> client.execute(TopQueriesAction.INSTANCE, topQueriesRequest, topQueriesResponse(channel)); + } + + static TopQueriesRequest prepareRequest(final RestRequest request) { + final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + final String metricType = request.param("type", MetricType.LATENCY.toString()); + if (!ALLOWED_METRICS.contains(metricType)) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType) + ); + } + return new TopQueriesRequest(MetricType.fromString(metricType), nodesIds); + } + + @Override + protected Set responseParams() { + return Settings.FORMAT_PARAMS; + } + + @Override + public boolean canTripCircuitBreaker() { + return false; + } + + private RestResponseListener topQueriesResponse(final RestChannel channel) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final TopQueriesResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java new file mode 100644 index 0000000000000..087cf7d765f8c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Rest Handlers for Top N Queries + */ +package org.opensearch.plugin.insights.rules.resthandler.top_queries; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java new file mode 100644 index 0000000000000..f3a1c70b9af57 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions for Query Insights. + */ +package org.opensearch.plugin.insights.rules.transport; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java new file mode 100644 index 0000000000000..ddf614211bc41 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java @@ -0,0 +1,155 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.transport.top_queries; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TransportTopQueriesAction extends TransportNodesAction< + TopQueriesRequest, + TopQueriesResponse, + TransportTopQueriesAction.NodeRequest, + TopQueries> { + + private final QueryInsightsService queryInsightsService; + + /** + * Create the TransportTopQueriesAction Object + + * @param threadPool The OpenSearch thread pool to run async tasks + * @param clusterService The clusterService of this node + * @param transportService The TransportService of this node + * @param queryInsightsService The topQueriesByLatencyService associated with this Transport Action + * @param actionFilters the action filters + */ + @Inject + public TransportTopQueriesAction( + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final QueryInsightsService queryInsightsService, + final ActionFilters actionFilters + ) { + super( + TopQueriesAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + TopQueriesRequest::new, + NodeRequest::new, + ThreadPool.Names.GENERIC, + TopQueries.class + ); + this.queryInsightsService = queryInsightsService; + } + + @Override + protected TopQueriesResponse newResponse( + final TopQueriesRequest topQueriesRequest, + final List responses, + final List failures + ) { + if (topQueriesRequest.getMetricType() == MetricType.LATENCY) { + return new TopQueriesResponse( + clusterService.getClusterName(), + responses, + failures, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE), + MetricType.LATENCY + ); + } else { + throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", topQueriesRequest.getMetricType())); + } + } + + @Override + protected NodeRequest newNodeRequest(final TopQueriesRequest request) { + return new NodeRequest(request); + } + + @Override + protected TopQueries newNodeResponse(final StreamInput in) throws IOException { + return new TopQueries(in); + } + + @Override + protected TopQueries nodeOperation(final NodeRequest nodeRequest) { + final TopQueriesRequest request = nodeRequest.request; + if (request.getMetricType() == MetricType.LATENCY) { + return new TopQueries( + clusterService.localNode(), + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(true) + ); + } else { + throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", request.getMetricType())); + } + + } + + /** + * Inner Node Top Queries Request + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + final TopQueriesRequest request; + + /** + * Create the NodeResponse object from StreamInput + * + * @param in the StreamInput to read the object + * @throws IOException IOException + */ + public NodeRequest(StreamInput in) throws IOException { + super(in); + request = new TopQueriesRequest(in); + } + + /** + * Create the NodeResponse object from a TopQueriesRequest + * @param request the TopQueriesRequest object + */ + public NodeRequest(final TopQueriesRequest request) { + this.request = request; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java new file mode 100644 index 0000000000000..54da0980deff8 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions for Top N Queries. + */ +package org.opensearch.plugin.insights.rules.transport.top_queries; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index f6d95450855de..273b69e483e8c 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -14,11 +14,16 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.insights.core.listener.QueryInsightsListener; import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.plugins.ActionPlugin; import org.opensearch.rest.RestHandler; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.junit.Before; @@ -74,18 +79,29 @@ public void testCreateComponent() { null, null ); - assertEquals(1, components.size()); + assertEquals(2, components.size()); assertTrue(components.get(0) instanceof QueryInsightsService); + assertTrue(components.get(1) instanceof QueryInsightsListener); + } + + public void testGetExecutorBuilders() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + List> executorBuilders = queryInsightsPlugin.getExecutorBuilders(settings); + assertEquals(1, executorBuilders.size()); + assertTrue(executorBuilders.get(0) instanceof ScalingExecutorBuilder); } public void testGetRestHandlers() { List components = queryInsightsPlugin.getRestHandlers(Settings.EMPTY, null, null, null, null, null, null); - assertEquals(0, components.size()); + assertEquals(1, components.size()); + assertTrue(components.get(0) instanceof RestTopQueriesAction); } public void testGetActions() { List> components = queryInsightsPlugin.getActions(); - assertEquals(0, components.size()); + assertEquals(1, components.size()); + assertTrue(components.get(0).getAction() instanceof TopQueriesAction); } } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 04f49d5fbc7dd..870ef5b9c8be9 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -9,12 +9,15 @@ package org.opensearch.plugin.insights; import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.util.Maps; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.VersionUtils; import java.io.IOException; import java.util.ArrayList; @@ -26,7 +29,11 @@ import java.util.Set; import java.util.TreeSet; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress; +import static org.opensearch.test.OpenSearchTestCase.random; import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; import static org.opensearch.test.OpenSearchTestCase.randomArray; import static org.opensearch.test.OpenSearchTestCase.randomDouble; @@ -79,6 +86,33 @@ public static List generateQueryInsightRecords(int lower, int return records; } + public static TopQueries createRandomTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = generateQueryInsightRecords(10); + + return new TopQueries(node, records); + } + + public static TopQueries createFixedTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + List records = new ArrayList<>(); + records.add(createFixedSearchQueryRecord()); + + return new TopQueries(node, records); + } + public static SearchQueryRecord createFixedSearchQueryRecord() { long timestamp = 1706574180000L; Map measurements = Map.of(MetricType.LATENCY, 1L); diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java new file mode 100644 index 0000000000000..f340950017a5c --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.TopQueriesService; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit Tests for {@link QueryInsightsListener}. + */ +public class QueryInsightsListenerTests extends OpenSearchTestCase { + private final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + private final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + private final SearchRequest searchRequest = mock(SearchRequest.class); + private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); + private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private ClusterService clusterService; + + @Before + public void setup() { + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterService = new ClusterService(settings, clusterSettings, null); + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); + } + + public void testOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); + + verify(queryInsightsService, times(1)).addRecord(any()); + } + + public void testConcurrentOnRequestEnd() throws InterruptedException { + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + final List searchListenersList = new ArrayList<>(); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + int numRequests = 50; + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService)); + } + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + QueryInsightsListener thisListener = searchListenersList.get(finalI); + thisListener.onRequestEnd(searchPhaseContext, searchRequestContext); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(queryInsightsService, times(numRequests)).addRecord(any()); + } + + public void testSetEnabled() { + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); + assertTrue(queryInsightsListener.isEnabled()); + + when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false); + when(queryInsightsService.isCollectionEnabled(MetricType.JVM)).thenReturn(false); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false); + assertFalse(queryInsightsListener.isEnabled()); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java new file mode 100644 index 0000000000000..619fd4b33a3dc --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Granular tests for the {@link TopQueriesRequest} class. + */ +public class TopQueriesRequestTests extends OpenSearchTestCase { + + /** + * Check that we can set the metric type + */ + public void testSetMetricType() throws Exception { + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY, randomAlphaOfLength(5)); + TopQueriesRequest deserializedRequest = roundTripRequest(request); + assertEquals(request.getMetricType(), deserializedRequest.getMetricType()); + } + + /** + * Serialize and deserialize a request. + * @param request A request to serialize. + * @return The deserialized, "round-tripped" request. + */ + private static TopQueriesRequest roundTripRequest(TopQueriesRequest request) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new TopQueriesRequest(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java new file mode 100644 index 0000000000000..eeee50d3da703 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Granular tests for the {@link TopQueriesResponse} class. + */ +public class TopQueriesResponseTests extends OpenSearchTestCase { + + /** + * Check serialization and deserialization + */ + public void testSerialize() throws Exception { + TopQueries topQueries = QueryInsightsTestUtils.createRandomTopQueries(); + ClusterName clusterName = new ClusterName("test-cluster"); + TopQueriesResponse response = new TopQueriesResponse(clusterName, List.of(topQueries), new ArrayList<>(), 10, MetricType.LATENCY); + TopQueriesResponse deserializedResponse = roundTripResponse(response); + assertEquals(response.toString(), deserializedResponse.toString()); + } + + public void testToXContent() throws IOException { + char[] expectedXcontent = + "{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"latency\":1}]}" + .toCharArray(); + TopQueries topQueries = QueryInsightsTestUtils.createFixedTopQueries(); + ClusterName clusterName = new ClusterName("test-cluster"); + TopQueriesResponse response = new TopQueriesResponse(clusterName, List.of(topQueries), new ArrayList<>(), 10, MetricType.LATENCY); + XContentBuilder builder = MediaTypeRegistry.contentBuilder(MediaTypeRegistry.JSON); + char[] xContent = BytesReference.bytes(response.toXContent(builder, ToXContent.EMPTY_PARAMS)).utf8ToString().toCharArray(); + Arrays.sort(expectedXcontent); + Arrays.sort(xContent); + + assertEquals(Arrays.hashCode(expectedXcontent), Arrays.hashCode(xContent)); + } + + /** + * Serialize and deserialize a TopQueriesResponse. + * @param response A response to serialize. + * @return The deserialized, "round-tripped" response. + */ + private static TopQueriesResponse roundTripResponse(TopQueriesResponse response) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new TopQueriesResponse(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java new file mode 100644 index 0000000000000..7db08b53ad1df --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +/** + * Tests for {@link TopQueries}. + */ +public class TopQueriesTests extends OpenSearchTestCase { + + public void testTopQueries() throws IOException { + TopQueries topQueries = QueryInsightsTestUtils.createRandomTopQueries(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + topQueries.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + TopQueries readTopQueries = new TopQueries(in); + assertTrue( + QueryInsightsTestUtils.checkRecordsEquals(topQueries.getTopQueriesRecord(), readTopQueries.getTopQueriesRecord()) + ); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java new file mode 100644 index 0000000000000..ac19fa2a7348f --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.resthandler.top_queries; + +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction.ALLOWED_METRICS; + +public class RestTopQueriesActionTests extends OpenSearchTestCase { + + public void testEmptyNodeIdsValidType() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertEquals(0, actual.nodesIds().length); + } + + public void testNodeIdsValid() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + String[] nodes = randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)); + params.put("nodeId", String.join(",", nodes)); + + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertArrayEquals(nodes, actual.nodesIds()); + } + + public void testInValidType() { + Map params = new HashMap<>(); + params.put("type", randomAlphaOfLengthBetween(5, 10).toUpperCase(Locale.ROOT)); + + RestRequest restRequest = buildRestRequest(params); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { RestTopQueriesAction.prepareRequest(restRequest); }); + assertEquals( + String.format(Locale.ROOT, "request [/_insights/top_queries] contains invalid metric type [%s]", params.get("type")), + exception.getMessage() + ); + } + + public void testGetRoutes() { + RestTopQueriesAction action = new RestTopQueriesAction(); + List routes = action.routes(); + assertEquals(2, routes.size()); + assertEquals("query_insights_top_queries_action", action.getName()); + } + + private FakeRestRequest buildRestRequest(Map params) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath("/_insights/top_queries") + .withParams(params) + .build(); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java new file mode 100644 index 0000000000000..a5f36b6e8cce0 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesActionTests.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.transport.top_queries; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.junit.Before; + +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class TransportTopQueriesActionTests extends OpenSearchTestCase { + + private final ThreadPool threadPool = mock(ThreadPool.class); + + private final Settings.Builder settingsBuilder = Settings.builder(); + private final Settings settings = settingsBuilder.build(); + private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + private final TransportService transportService = mock(TransportService.class); + private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class); + private final ActionFilters actionFilters = mock(ActionFilters.class); + private final TransportTopQueriesAction transportTopQueriesAction = new TransportTopQueriesAction( + threadPool, + clusterService, + transportService, + topQueriesByLatencyService, + actionFilters + ); + private final DummyParentAction dummyParentAction = new DummyParentAction( + threadPool, + clusterService, + transportService, + topQueriesByLatencyService, + actionFilters + ); + + class DummyParentAction extends TransportTopQueriesAction { + public DummyParentAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + QueryInsightsService topQueriesByLatencyService, + ActionFilters actionFilters + ) { + super(threadPool, clusterService, transportService, topQueriesByLatencyService, actionFilters); + } + + public TopQueriesResponse createNewResponse() { + TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); + return newResponse(request, List.of(), List.of()); + } + } + + @Before + public void setup() { + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + } + + public void testNewResponse() { + TopQueriesResponse response = dummyParentAction.createNewResponse(); + assertNotNull(response); + } + +}