From e08b2506aeaf30c617b98c86d770c1e0a6464224 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 16 Jan 2024 22:19:08 -0800 Subject: [PATCH] Top N Queries by latency implementation Signed-off-by: Chenyang Ji --- CHANGELOG.md | 1 + .../QueryInsightsPluginTransportIT.java | 218 ++++++++++++++++ .../plugin/insights/TopQueriesRestIT.java | 107 ++++++++ .../listener/SearchQueryLatencyListener.java | 121 +++++++++ .../insights/core/listener/package-info.java | 12 + .../service/TopQueriesByLatencyService.java | 240 ++++++++++++++++++ .../insights/rules/action/package-info.java | 12 + .../rules/action/top_queries/TopQueries.java | 71 ++++++ .../action/top_queries/TopQueriesAction.java | 26 ++ .../action/top_queries/TopQueriesRequest.java | 96 +++++++ .../top_queries/TopQueriesResponse.java | 120 +++++++++ .../action/top_queries/package-info.java | 12 + .../rules/resthandler/package-info.java | 12 + .../top_queries/RestTopQueriesAction.java | 102 ++++++++ .../resthandler/top_queries/package-info.java | 12 + .../rules/transport/package-info.java | 12 + .../TransportTopQueriesAction.java | 131 ++++++++++ .../transport/top_queries/package-info.java | 12 + .../SearchQueryLatencyListenerTests.java | 182 +++++++++++++ .../TopQueriesByLatencyServiceTests.java | 221 ++++++++++++++++ .../top_queries/TopQueriesRequestTests.java | 43 ++++ .../top_queries/TopQueriesResponseTests.java | 49 ++++ .../action/top_queries/TopQueriesTests.java | 42 +++ .../model/SearchQueryLatencyRecordTests.java | 50 ++++ .../RestTopQueriesActionTests.java | 62 +++++ 25 files changed, 1966 insertions(+) create mode 100644 plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java create mode 100644 plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecordTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f64dcf82bd4d..95740df57490f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -194,6 +194,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582)) - Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732)) - Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526)) +- [Query Insights] Implement Top N Queries Feature in Query Insights Framework([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904)) ### Deprecated diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java new file mode 100644 index 0000000000000..f1d8594affca7 --- /dev/null +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -0,0 +1,218 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * Transport Action tests for Query Insights Plugin + */ + +@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST) +public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { + + private final int TOTAL_NUMBER_OF_NODES = 2; + private final int TOTAL_SEARCH_REQUESTS = 5; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(QueryInsightsPlugin.class); + } + + /** + * Test Query Insights Plugin is installed + */ + public void testQueryInsightPluginInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.insights.QueryInsightsPlugin")) + ); + } + + /** + * Test get top queries when feature disabled + */ + public void testGetTopQueriesWhenFeatureDisabled() { + TopQueriesRequest request = new TopQueriesRequest(); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertNotEquals(0, response.failures().size()); + Assert.assertEquals( + "Cannot get query data when query insight feature is not enabled.", + response.failures().get(0).getCause().getCause().getMessage() + ); + } + + /** + * Test get top queries when feature enabled + */ + public void testGetTopQueriesWhenFeatureEnabled() { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") + .build(); + + logger.info("--> starting 2 nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + + TopQueriesRequest request = new TopQueriesRequest(); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum()); + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries with small top n size + */ + public void testGetTopQueriesWithSmallTopN() { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") + .build(); + + logger.info("--> starting 2 nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + + TopQueriesRequest request = new TopQueriesRequest(); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + // TODO: this should be 1 after changing to cluster level top N. + Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum()); + + internalCluster().stopAllNodes(); + } + + /** + * Test get top queries with small window size + */ + public void testGetTopQueriesWithSmallWindowSize() { + Settings commonSettings = Settings.builder() + .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") + .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") + .put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "0ms") + .build(); + + logger.info("--> starting 2 nodes for query insight testing"); + List nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertFalse(health.isTimedOut()); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) + ); + ensureStableCluster(2); + logger.info("--> creating indices for query insight testing"); + for (int i = 0; i < 5; i++) { + IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + assertEquals("CREATED", response.status().toString()); + } + // making search requests to get top queries + for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + } + + TopQueriesRequest request = new TopQueriesRequest(); + TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); + Assert.assertEquals(0, response.failures().size()); + Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); + Assert.assertEquals(0, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum()); + + internalCluster().stopAllNodes(); + } +} diff --git a/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java b/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java new file mode 100644 index 0000000000000..57dea6ad8d5ff --- /dev/null +++ b/plugins/query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.junit.Assert; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +/** + * Rest Action tests for Query Insights + */ +public class TopQueriesRestIT extends OpenSearchRestTestCase { + + /** + * test Query Insights is installed + * @throws IOException IOException + */ + @SuppressWarnings("unchecked") + public void testQueryInsightsPluginInstalled() throws IOException { + Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json"); + Response response = client().performRequest(request); + List pluginsList = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getEntity().getContent() + ).list(); + Assert.assertTrue( + pluginsList.stream().map(o -> (Map) o).anyMatch(plugin -> plugin.get("component").equals("query-insights")) + ); + } + + /** + * test enabling top queries + * @throws IOException IOException + */ + public void testTopQueriesResponses() throws IOException { + // Enable Top N Queries feature + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity(defaultTopQueriesSettings()); + Response response = client().performRequest(request); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + + // Create documents for search + request = new Request("POST", "/my-index-0/_doc"); + request.setJsonEntity(createDocumentsBody()); + response = client().performRequest(request); + + Assert.assertEquals(201, response.getStatusLine().getStatusCode()); + + // Do Search + request = new Request("GET", "/my-index-0/_search?size=20&pretty"); + request.setJsonEntity(searchBody()); + response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + + // Get Top Queries + request = new Request("GET", "/_insights/top_queries?pretty"); + response = client().performRequest(request); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + String top_requests = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + Assert.assertTrue(top_requests.contains("top_queries")); + Assert.assertEquals(2, top_requests.split("searchType", -1).length - 1); + } + + private String defaultTopQueriesSettings() { + return "{\n" + + " \"persistent\" : {\n" + + " \"search.top_n_queries.latency.enabled\" : \"true\",\n" + + " \"search.top_n_queries.latency.window_size\" : \"600s\",\n" + + " \"search.top_n_queries.latency.top_n_size\" : 5\n" + + " }\n" + + "}"; + } + + private String createDocumentsBody() { + return "{\n" + + " \"@timestamp\": \"2099-11-15T13:12:00\",\n" + + " \"message\": \"this is document 1\",\n" + + " \"user\": {\n" + + " \"id\": \"cyji\"\n" + + " }\n" + + "}"; + } + + private String searchBody() { + return "{}"; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java new file mode 100644 index 0000000000000..4a7a3ff9dc547 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListener.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; + +/** + * The listener for top N queries by latency + * + * @opensearch.internal + */ +public final class SearchQueryLatencyListener extends SearchRequestOperationsListener { + private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + + private static final Logger log = LogManager.getLogger(SearchQueryLatencyListener.class); + + private final TopQueriesByLatencyService topQueriesByLatencyService; + + @Inject + public SearchQueryLatencyListener(ClusterService clusterService, TopQueriesByLatencyService topQueriesByLatencyService) { + this.topQueriesByLatencyService = topQueriesByLatencyService; + clusterService.getClusterSettings().addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, this::setEnabled); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_SIZE, + this.topQueriesByLatencyService::setTopNSize, + this.topQueriesByLatencyService::validateTopNSize + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + this.topQueriesByLatencyService::setWindowSize, + this.topQueriesByLatencyService::validateWindowSize + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_TYPE, this.topQueriesByLatencyService::setExporterType); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL, + this.topQueriesByLatencyService::setExportInterval, + this.topQueriesByLatencyService::validateExportInterval + ); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER, this.topQueriesByLatencyService::setExporterIdentifier); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED, this.topQueriesByLatencyService::setExporterEnabled); + + this.setEnabled(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); + this.topQueriesByLatencyService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); + this.topQueriesByLatencyService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); + } + + @Override + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + this.topQueriesByLatencyService.setEnableCollect(enabled); + } + + @Override + public boolean isEnabled() { + return super.isEnabled(); + } + + @Override + public void onPhaseStart(SearchPhaseContext context) {} + + @Override + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + public void onPhaseFailure(SearchPhaseContext context) {} + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + SearchRequest request = context.getRequest(); + try { + topQueriesByLatencyService.ingestQueryData( + request.getOrCreateAbsoluteStartMillis(), + request.searchType(), + request.source().toString(FORMAT_PARAMS), + context.getNumShards(), + request.indices(), + new HashMap<>(), + searchRequestContext.phaseTookMap(), + System.nanoTime() - searchRequestContext.getAbsoluteStartNanos() + ); + } catch (Exception e) { + log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java new file mode 100644 index 0000000000000..3cb9cacf7fd1c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Listeners for Query Insights + */ +package org.opensearch.plugin.insights.core.listener; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java new file mode 100644 index 0000000000000..5914239b4054e --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyService.java @@ -0,0 +1,240 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchType; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterType; +import org.opensearch.plugin.insights.core.exporter.QueryInsightsLocalIndexExporter; +import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.PriorityBlockingQueue; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_LOCAL_INDEX_MAPPING; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_EXPORT_INTERVAL; + +/** + * Service responsible for gathering, analyzing, storing and exporting + * top N queries with high latency data for search queries + * + * @opensearch.internal + */ +public class TopQueriesByLatencyService extends QueryInsightsService< + SearchQueryLatencyRecord, + PriorityBlockingQueue, + QueryInsightsExporter> { + private static final Logger log = LogManager.getLogger(TopQueriesByLatencyService.class); + + private static final TimeValue delay = TimeValue.ZERO; + + private int topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; + + private TimeValue windowSize = TimeValue.timeValueSeconds(QueryInsightsSettings.DEFAULT_WINDOW_SIZE); + + private final ClusterService clusterService; + private final Client client; + + @Inject + public TopQueriesByLatencyService(ThreadPool threadPool, ClusterService clusterService, Client client) { + super(threadPool, new PriorityBlockingQueue<>(), null); + this.clusterService = clusterService; + this.client = client; + } + + /** + * Ingest the query data into to the top N queries with latency store + * + * @param timestamp The timestamp of the query. + * @param searchType The manner at which the search operation is executed. see {@link SearchType} + * @param source The search source that was executed by the query. + * @param totalShards Total number of shards as part of the search query across all indices + * @param indices The indices involved in the search query + * @param propertyMap Extra attributes and information about a search query + * @param phaseLatencyMap Contains phase level latency information in a search query + * @param tookInNanos Total search request took time in nanoseconds + */ + public void ingestQueryData( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap, + final Map phaseLatencyMap, + final Long tookInNanos + ) { + if (timestamp <= 0) { + log.error( + String.format( + Locale.ROOT, + "Invalid timestamp %s when ingesting query data to compute top n queries with latency", + timestamp + ) + ); + return; + } + if (totalShards <= 0) { + log.error( + String.format( + Locale.ROOT, + "Invalid totalShards %s when ingesting query data to compute top n queries with latency", + totalShards + ) + ); + return; + } + this.threadPool.schedule(() -> { + super.ingestQueryData( + new SearchQueryLatencyRecord(timestamp, searchType, source, totalShards, indices, propertyMap, phaseLatencyMap, tookInNanos) + ); + // remove top elements for fix sizing priority queue + if (this.store.size() > this.getTopNSize()) { + this.store.poll(); + } + }, delay, ThreadPool.Names.GENERIC); + + log.debug(String.format(Locale.ROOT, "successfully ingested: %s", this.store)); + } + + @Override + public void clearOutdatedData() { + store.removeIf(record -> record.getTimestamp() < System.currentTimeMillis() - windowSize.getMillis()); + } + + public void setTopNSize(int size) { + this.topNSize = size; + } + + public void validateTopNSize(int size) { + if (size > QueryInsightsSettings.MAX_N_SIZE) { + throw new IllegalArgumentException( + "Top N size setting [" + + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE.getKey() + + "]" + + " should be smaller than max top N size [" + + QueryInsightsSettings.MAX_N_SIZE + + "was (" + + size + + " > " + + QueryInsightsSettings.MAX_N_SIZE + + ")" + ); + } + } + + public int getTopNSize() { + return this.topNSize; + } + + public void setWindowSize(TimeValue windowSize) { + this.windowSize = windowSize; + } + + public void validateWindowSize(TimeValue windowSize) { + if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0) { + throw new IllegalArgumentException( + "Window size setting [" + + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey() + + "]" + + " should be smaller than max window size [" + + QueryInsightsSettings.MAX_WINDOW_SIZE + + "was (" + + windowSize + + " > " + + QueryInsightsSettings.MAX_WINDOW_SIZE + + ")" + ); + } + } + + public TimeValue getWindowSize() { + return this.windowSize; + } + + public void setExporterType(QueryInsightsExporterType type) { + resetExporter( + getEnableExport(), + type, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER) + ); + } + + public void setExporterEnabled(boolean enabled) { + super.setEnableExport(enabled); + resetExporter( + enabled, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER) + ); + } + + public void setExporterIdentifier(String identifier) { + resetExporter( + getEnableExport(), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE), + identifier + ); + } + + public void setExportInterval(TimeValue interval) { + super.setExportInterval(interval); + resetExporter( + getEnableExport(), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE), + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER) + ); + } + + public void validateExportInterval(TimeValue exportInterval) { + if (exportInterval.getSeconds() < MIN_EXPORT_INTERVAL.getSeconds()) { + throw new IllegalArgumentException( + "Export Interval setting [" + + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL.getKey() + + "]" + + " should not be smaller than minimal export interval size [" + + MIN_EXPORT_INTERVAL + + "]" + + "was (" + + exportInterval + + " < " + + MIN_EXPORT_INTERVAL + + ")" + ); + } + } + + public void resetExporter(boolean enabled, QueryInsightsExporterType type, String identifier) { + this.stop(); + this.exporter = null; + + if (!enabled) { + return; + } + if (type.equals(QueryInsightsExporterType.LOCAL_INDEX)) { + this.exporter = new QueryInsightsLocalIndexExporter<>( + clusterService, + client, + identifier, + TopQueriesByLatencyService.class.getClassLoader().getResourceAsStream(DEFAULT_LOCAL_INDEX_MAPPING) + ); + } + this.start(); + } + +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java new file mode 100644 index 0000000000000..9b6b5856f7d27 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions, Requests and Responses for Query Insights + */ +package org.opensearch.plugin.insights.rules.action; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java new file mode 100644 index 0000000000000..1bb250e5d57b7 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Nullable; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; + +import java.io.IOException; +import java.util.List; + +/** + * Top Queries by resource usage / latency on a node + *

+ * Mainly used in the top N queries node response workflow. + * + * @opensearch.internal + */ +public class TopQueries extends BaseNodeResponse implements ToXContentObject { + /** The store to keep the top N queries with latency records */ + @Nullable + private final List latencyRecords; + + public TopQueries(StreamInput in) throws IOException { + super(in); + latencyRecords = in.readList(SearchQueryLatencyRecord::new); + } + + public TopQueries(DiscoveryNode node, @Nullable List latencyRecords) { + super(node); + this.latencyRecords = latencyRecords; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (latencyRecords != null) { + for (SearchQueryLatencyRecord record : latencyRecords) { + record.toXContent(builder, params); + } + } + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (latencyRecords != null) { + out.writeList(latencyRecords); + } + } + + /** + * Get all latency records + * + * @return the latency records in this node response + */ + public List getLatencyRecords() { + return latencyRecords; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java new file mode 100644 index 0000000000000..69abf001d18d9 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.ActionType; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesAction extends ActionType { + + public static final TopQueriesAction INSTANCE = new TopQueriesAction(); + public static final String NAME = "cluster:monitor/insights/top_queries"; + + private TopQueriesAction() { + super(NAME, TopQueriesResponse::new); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java new file mode 100644 index 0000000000000..435f5188a4e0b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A request to get cluster/node level top queries information. + * + * @opensearch.internal + */ +@PublicApi(since = "1.0.0") +public class TopQueriesRequest extends BaseNodesRequest { + + Metric metricType = Metric.LATENCY; + + /** + * Create a new TopQueriesRequest from a {@link StreamInput} object. + * + * @param in A stream input object. + * @throws IOException if the stream cannot be deserialized. + */ + public TopQueriesRequest(StreamInput in) throws IOException { + super(in); + setMetricType(in.readString()); + } + + /** + * Get top queries from nodes based on the nodes ids specified. + * If none are passed, cluster level top queries will be returned. + */ + public TopQueriesRequest(String... nodesIds) { + super(nodesIds); + } + + /** + * Get the type of requested metrics + */ + public Metric getMetricType() { + return metricType; + } + + /** + * Set the type of requested metrics + */ + public TopQueriesRequest setMetricType(String metricType) { + metricType = metricType.toUpperCase(Locale.ROOT); + if (false == Metric.allMetrics().contains(metricType)) { + throw new IllegalStateException("Invalid metric used in top queries request: " + metricType); + } + this.metricType = Metric.valueOf(metricType); + return this; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(metricType.metricName()); + } + + /** + * ALl supported metrics for top queries + */ + public enum Metric { + LATENCY("LATENCY"); + + private final String metricName; + + Metric(String name) { + this.metricName = name; + } + + public String metricName() { + return this.metricName; + } + + public static Set allMetrics() { + return Arrays.stream(values()).map(Metric::metricName).collect(Collectors.toSet()); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java new file mode 100644 index 0000000000000..37767dcee8fe5 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transport response for cluster/node level top queries information. + * + * @opensearch.internal + */ +@PublicApi(since = "1.0.0") +public class TopQueriesResponse extends BaseNodesResponse implements ToXContentFragment { + + private static final String CLUSTER_LEVEL_RESULTS_KEY = "top_queries"; + private final int top_n_size; + + public TopQueriesResponse(StreamInput in) throws IOException { + super(in); + top_n_size = in.readInt(); + } + + public TopQueriesResponse(ClusterName clusterName, List nodes, List failures, int top_n_size) { + super(clusterName, nodes, failures); + this.top_n_size = top_n_size; + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(TopQueries::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + out.writeLong(top_n_size); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + List results = getNodes(); + builder.startObject(); + toClusterLevelResult(builder, params, results); + return builder.endObject(); + } + + @Override + public String toString() { + try { + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + this.toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return builder.toString(); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } + + /** + * Merge top n queries results from nodes into cluster level results in XContent format. + * + * @param builder XContent builder + * @param params serialization parameters + * @param results top queries results from all nodes + * @throws IOException if an error occurs + */ + private void toClusterLevelResult(XContentBuilder builder, Params params, List results) throws IOException { + List all_records = results.stream() + .map(TopQueries::getLatencyRecords) + .flatMap(Collection::stream) + .sorted(Collections.reverseOrder()) + .limit(top_n_size) + .collect(Collectors.toList()); + builder.startArray(CLUSTER_LEVEL_RESULTS_KEY); + for (SearchQueryLatencyRecord record : all_records) { + record.toXContent(builder, params); + } + builder.endArray(); + } + + /** + * build node level top n queries results in XContent format. + * + * @param builder XContent builder + * @param params serialization parameters + * @param results top queries results from all nodes + * @throws IOException if an error occurs + */ + private void toNodeLevelResult(XContentBuilder builder, Params params, List results) throws IOException { + builder.startObject(CLUSTER_LEVEL_RESULTS_KEY); + for (TopQueries topQueries : results) { + builder.startArray(topQueries.getNode().getId()); + topQueries.toXContent(builder, params); + builder.endArray(); + } + builder.endObject(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java new file mode 100644 index 0000000000000..3cc7900e5ce7d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions, Requests and Responses for Top N Queries + */ +package org.opensearch.plugin.insights.rules.action.top_queries; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java new file mode 100644 index 0000000000000..3787f05f65552 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Rest Handlers for Query Insights + */ +package org.opensearch.plugin.insights.rules.resthandler; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java new file mode 100644 index 0000000000000..874df86ebb4fc --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.resthandler.top_queries; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.Strings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Set; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_BASE_URI; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Transport action to get Top N queries by certain metric type + * + * @opensearch.api + */ +public class RestTopQueriesAction extends BaseRestHandler { + /** The metric types that are allowed in top N queries */ + static final Set ALLOWED_METRICS = TopQueriesRequest.Metric.allMetrics(); + + public RestTopQueriesAction() {} + + @Override + public List routes() { + return List.of( + new Route(GET, TOP_QUERIES_BASE_URI), + new Route(GET, String.format(Locale.ROOT, "%s/{nodeId}", TOP_QUERIES_BASE_URI)) + ); + } + + @Override + public String getName() { + return "top_queries_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final TopQueriesRequest topQueriesRequest = prepareRequest(request); + topQueriesRequest.timeout(request.param("timeout")); + + return channel -> client.execute( + TopQueriesAction.INSTANCE, + topQueriesRequest, + topQueriesResponse(channel, request.method()) + + ); + } + + static TopQueriesRequest prepareRequest(final RestRequest request) { + String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + String metricType = request.param("type", TopQueriesRequest.Metric.LATENCY.metricName()).toUpperCase(Locale.ROOT); + if (!ALLOWED_METRICS.contains(metricType)) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType) + ); + } + TopQueriesRequest topQueriesRequest = new TopQueriesRequest(nodesIds); + topQueriesRequest.setMetricType(metricType); + return topQueriesRequest; + } + + @Override + protected Set responseParams() { + return Settings.FORMAT_PARAMS; + } + + @Override + public boolean canTripCircuitBreaker() { + return false; + } + + private RestResponseListener topQueriesResponse(RestChannel channel, RestRequest.Method restMethod) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(TopQueriesResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java new file mode 100644 index 0000000000000..087cf7d765f8c --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Rest Handlers for Top N Queries + */ +package org.opensearch.plugin.insights.rules.resthandler.top_queries; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java new file mode 100644 index 0000000000000..f3a1c70b9af57 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions for Query Insights. + */ +package org.opensearch.plugin.insights.rules.transport; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java new file mode 100644 index 0000000000000..fa7996cc6f3f7 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java @@ -0,0 +1,131 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.transport.top_queries; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TransportTopQueriesAction extends TransportNodesAction< + TopQueriesRequest, + TopQueriesResponse, + TransportTopQueriesAction.NodeRequest, + TopQueries> { + + private final TopQueriesByLatencyService topQueriesByLatencyService; + + @Inject + public TransportTopQueriesAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + TopQueriesByLatencyService topQueriesByLatencyService, + ActionFilters actionFilters + ) { + super( + TopQueriesAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + TopQueriesRequest::new, + NodeRequest::new, + ThreadPool.Names.GENERIC, + TopQueries.class + ); + this.topQueriesByLatencyService = topQueriesByLatencyService; + } + + @Override + protected TopQueriesResponse newResponse( + TopQueriesRequest topQueriesRequest, + List responses, + List failures + ) { + if (topQueriesRequest.getMetricType() == TopQueriesRequest.Metric.LATENCY) { + return new TopQueriesResponse( + clusterService.getClusterName(), + responses, + failures, + clusterService.getClusterSettings().get(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE) + ); + } else { + throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", topQueriesRequest.getMetricType())); + } + } + + @Override + protected NodeRequest newNodeRequest(TopQueriesRequest request) { + return new NodeRequest(request); + } + + @Override + protected TopQueries newNodeResponse(StreamInput in) throws IOException { + return new TopQueries(in); + } + + @Override + protected TopQueries nodeOperation(NodeRequest nodeRequest) { + TopQueriesRequest request = nodeRequest.request; + if (request.getMetricType() == TopQueriesRequest.Metric.LATENCY) { + return new TopQueries(clusterService.localNode(), topQueriesByLatencyService.getQueryData()); + } else { + throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", request.getMetricType())); + } + + } + + /** + * Inner Node Top Queries Request + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + TopQueriesRequest request; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + request = new TopQueriesRequest(in); + } + + public NodeRequest(TopQueriesRequest request) { + this.request = request; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java new file mode 100644 index 0000000000000..54da0980deff8 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Transport Actions for Top N Queries. + */ +package org.opensearch.plugin.insights.rules.transport.top_queries; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java new file mode 100644 index 0000000000000..5228e0054c440 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/SearchQueryLatencyListenerTests.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit Tests for {@link SearchQueryLatencyListener}. + */ +public class SearchQueryLatencyListenerTests extends OpenSearchTestCase { + + public void testOnRequestEnd() { + final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + final SearchRequest searchRequest = mock(SearchRequest.class); + final TopQueriesByLatencyService topQueriesByLatencyService = mock(TopQueriesByLatencyService.class); + + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); + + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + SearchQueryLatencyListener searchQueryLatencyListener = new SearchQueryLatencyListener(clusterService, topQueriesByLatencyService); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + searchQueryLatencyListener.onRequestEnd(searchPhaseContext, searchRequestContext); + + verify(topQueriesByLatencyService, times(1)).ingestQueryData( + eq(timestamp), + eq(searchType), + eq(searchSourceBuilder.toString()), + eq(numberOfShards), + eq(indices), + anyMap(), + eq(phaseLatencyMap), + anyLong() + ); + } + + public void testConcurrentOnRequestEnd() throws InterruptedException { + final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + final SearchRequest searchRequest = mock(SearchRequest.class); + final TopQueriesByLatencyService topQueriesByLatencyService = mock(TopQueriesByLatencyService.class); + + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); + + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword")); + searchSourceBuilder.size(0); + + String[] indices = new String[] { "index-1", "index-2" }; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + final List searchListenersList = new ArrayList<>(); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + int numRequests = 50; + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + searchListenersList.add(new SearchQueryLatencyListener(clusterService, topQueriesByLatencyService)); + } + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + SearchQueryLatencyListener thisListener = searchListenersList.get(finalI); + thisListener.onRequestEnd(searchPhaseContext, searchRequestContext); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(topQueriesByLatencyService, times(numRequests)).ingestQueryData( + eq(timestamp), + eq(searchType), + eq(searchSourceBuilder.toString()), + eq(numberOfShards), + eq(indices), + anyMap(), + eq(phaseLatencyMap), + anyLong() + ); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java new file mode 100644 index 0000000000000..86f01caad8e7d --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesByLatencyServiceTests.java @@ -0,0 +1,221 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.Node; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.SearchQueryLatencyRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +import static org.mockito.Mockito.mock; + +/** + * Unit Tests for {@link TopQueriesByLatencyService}. + */ +public class TopQueriesByLatencyServiceTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + + @Before + public void setup() { + threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "top n queries tests").build()); + } + + @After + public void shutdown() throws Exception { + terminate(threadPool); + } + + public void testIngestQueryDataWithLargeWindow() throws InterruptedException { + final Client client = mock(Client.class); + + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); + + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + + TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); + topQueriesByLatencyService.setEnableCollect(true); + topQueriesByLatencyService.setTopNSize(Integer.MAX_VALUE); + topQueriesByLatencyService.setWindowSize(new TimeValue(Long.MAX_VALUE)); + + for (SearchQueryLatencyRecord record : records) { + topQueriesByLatencyService.ingestQueryData( + record.getTimestamp(), + record.getSearchType(), + record.getSource(), + record.getTotalShards(), + record.getIndices(), + record.getPropertyMap(), + record.getPhaseLatencyMap(), + record.getValue() + ); + } + Thread.sleep(1000); + assertTrue(QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder(topQueriesByLatencyService.getQueryData(), records)); + } + + public void testConcurrentIngestQueryDataWithLargeWindow() throws InterruptedException { + final Client client = mock(Client.class); + final ThreadPool threadPool = new ThreadPool( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default ingest tests").build() + ); + + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); + + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + + TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); + topQueriesByLatencyService.setEnableCollect(true); + topQueriesByLatencyService.setTopNSize(Integer.MAX_VALUE); + topQueriesByLatencyService.setWindowSize(new TimeValue(Long.MAX_VALUE)); + + int numRequests = records.size(); + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + topQueriesByLatencyService.ingestQueryData( + records.get(finalI).getTimestamp(), + records.get(finalI).getSearchType(), + records.get(finalI).getSource(), + records.get(finalI).getTotalShards(), + records.get(finalI).getIndices(), + records.get(finalI).getPropertyMap(), + records.get(finalI).getPhaseLatencyMap(), + records.get(finalI).getValue() + ); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + Thread.sleep(1000); + + assertTrue(QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder(topQueriesByLatencyService.getQueryData(), records)); + } + + public void testSmallWindowClearOutdatedData() throws InterruptedException { + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); + + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + + TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); + topQueriesByLatencyService.setEnableCollect(true); + topQueriesByLatencyService.setTopNSize(Integer.MAX_VALUE); + topQueriesByLatencyService.setWindowSize(new TimeValue(-1)); + + for (SearchQueryLatencyRecord record : records) { + topQueriesByLatencyService.ingestQueryData( + record.getTimestamp(), + record.getSearchType(), + record.getSource(), + record.getTotalShards(), + record.getIndices(), + record.getPropertyMap(), + record.getPhaseLatencyMap(), + record.getValue() + ); + } + Thread.sleep(1000); + assertEquals(0, topQueriesByLatencyService.getQueryData().size()); + } + + public void testSmallNSize() throws InterruptedException { + final Client client = mock(Client.class); + + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER); + + ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); + + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + + TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client); + topQueriesByLatencyService.setEnableCollect(true); + topQueriesByLatencyService.setTopNSize(1); + topQueriesByLatencyService.setWindowSize(new TimeValue(Long.MAX_VALUE)); + + for (SearchQueryLatencyRecord record : records) { + topQueriesByLatencyService.ingestQueryData( + record.getTimestamp(), + record.getSearchType(), + record.getSource(), + record.getTotalShards(), + record.getIndices(), + record.getPropertyMap(), + record.getPhaseLatencyMap(), + record.getValue() + ); + } + Thread.sleep(1000); + assertEquals(1, topQueriesByLatencyService.getQueryData().size()); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java new file mode 100644 index 0000000000000..07e98970fb628 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequestTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Granular tests for the {@link TopQueriesRequest} class. + */ +public class TopQueriesRequestTests extends OpenSearchTestCase { + + /** + * Check that we can set the metric type + */ + public void testSetMetricType() throws Exception { + TopQueriesRequest request = new TopQueriesRequest(randomAlphaOfLength(5)); + request.setMetricType(randomFrom(TopQueriesRequest.Metric.allMetrics())); + TopQueriesRequest deserializedRequest = roundTripRequest(request); + assertEquals(request.getMetricType(), deserializedRequest.getMetricType()); + } + + /** + * Serialize and deserialize a request. + * @param request A request to serialize. + * @return The deserialized, "round-tripped" request. + */ + private static TopQueriesRequest roundTripRequest(TopQueriesRequest request) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new TopQueriesRequest(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java new file mode 100644 index 0000000000000..11063cbedd248 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Granular tests for the {@link TopQueriesResponse} class. + */ +public class TopQueriesResponseTests extends OpenSearchTestCase { + + /** + * Check serialization and deserialization + */ + public void testToXContent() throws Exception { + TopQueries topQueries = QueryInsightsTestUtils.createTopQueries(); + ClusterName clusterName = new ClusterName("test-cluster"); + TopQueriesResponse response = new TopQueriesResponse(clusterName, List.of(topQueries), new ArrayList<>(), 10); + TopQueriesResponse deserializedResponse = roundTripResponse(response); + assertEquals(response.toString(), deserializedResponse.toString()); + } + + /** + * Serialize and deserialize a TopQueriesResponse. + * @param response A response to serialize. + * @return The deserialized, "round-tripped" response. + */ + private static TopQueriesResponse roundTripResponse(TopQueriesResponse response) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new TopQueriesResponse(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java new file mode 100644 index 0000000000000..4d0d641cf1a8d --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesTests.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.action.top_queries; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +/** + * Tests for {@link TopQueries}. + */ +public class TopQueriesTests extends OpenSearchTestCase { + + public void testTopQueries() throws IOException { + TopQueries topQueries = QueryInsightsTestUtils.createTopQueries(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + topQueries.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + TopQueries readTopQueries = new TopQueries(in); + assertExpected(topQueries, readTopQueries); + } + } + } + + /** + * checks all properties that are expected to be unchanged. + */ + private void assertExpected(TopQueries topQueries, TopQueries readTopQueries) throws IOException { + for (int i = 0; i < topQueries.getLatencyRecords().size(); i++) { + QueryInsightsTestUtils.compareJson(topQueries.getLatencyRecords().get(i), readTopQueries.getLatencyRecords().get(i)); + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecordTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecordTests.java new file mode 100644 index 0000000000000..e704e768a43e6 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryLatencyRecordTests.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; + +/** + * Granular tests for the {@link SearchQueryLatencyRecord} class. + */ +public class SearchQueryLatencyRecordTests extends OpenSearchTestCase { + + /** + * Check that if the serialization, deserialization and equals functions are working as expected + */ + public void testSerializationAndEquals() throws Exception { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + List copiedRecords = new ArrayList<>(); + for (SearchQueryLatencyRecord record : records) { + copiedRecords.add(roundTripRecord(record)); + } + assertTrue(QueryInsightsTestUtils.checkRecordsEquals(records, copiedRecords)); + + } + + /** + * Serialize and deserialize a SearchQueryLatencyRecord. + * @param record A SearchQueryLatencyRecord to serialize. + * @return The deserialized, "round-tripped" record. + */ + private static SearchQueryLatencyRecord roundTripRecord(SearchQueryLatencyRecord record) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + record.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new SearchQueryLatencyRecord(in); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java new file mode 100644 index 0000000000000..627d794731eed --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesActionTests.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.resthandler.top_queries; + +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import static org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction.ALLOWED_METRICS; + +public class RestTopQueriesActionTests extends OpenSearchTestCase { + + public void testEmptyNodeIdsValidType() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertEquals(0, actual.nodesIds().length); + } + + public void testNodeIdsValid() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + String[] nodes = randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)); + params.put("nodeId", String.join(",", nodes)); + + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertArrayEquals(nodes, actual.nodesIds()); + } + + public void testInValidType() { + Map params = new HashMap<>(); + params.put("type", randomAlphaOfLengthBetween(5, 10).toUpperCase(Locale.ROOT)); + + RestRequest restRequest = buildRestRequest(params); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { RestTopQueriesAction.prepareRequest(restRequest); }); + assertEquals( + String.format(Locale.ROOT, "request [/_insights/top_queries] contains invalid metric type [%s]", params.get("type")), + exception.getMessage() + ); + } + + private FakeRestRequest buildRestRequest(Map params) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath("/_insights/top_queries") + .withParams(params) + .build(); + } +}