From 409fc92901b9af3f908136275e6b66c8e98b105a Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Sat, 3 Feb 2024 00:15:40 -0800 Subject: [PATCH] refactor service for improving multithreading efficiency Signed-off-by: Chenyang Ji --- .../QueryInsightsPluginTransportIT.java | 19 +++--- .../core/listener/QueryInsightsListener.java | 60 ++++++++++--------- .../rules/action/top_queries/TopQueries.java | 13 ++-- .../action/top_queries/TopQueriesRequest.java | 16 ++--- .../top_queries/TopQueriesResponse.java | 33 +++++----- .../top_queries/RestTopQueriesAction.java | 17 ++---- .../TransportTopQueriesAction.java | 36 ++++++----- .../listener/QueryInsightsListenerTests.java | 9 ++- 8 files changed, 102 insertions(+), 101 deletions(-) diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java index c16a2edbdf3de..711faa9323520 100644 --- a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -81,7 +81,7 @@ public void testGetTopQueriesWhenFeatureDisabled() { TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertNotEquals(0, response.failures().size()); Assert.assertEquals( - "Cannot get query data when query insight feature is not enabled for MetricType [latency].", + "Cannot get top n queries when [search.top_n_queries.latency.enabled] is not enabled.", response.failures().get(0).getCause().getCause().getMessage() ); } @@ -89,7 +89,7 @@ public void testGetTopQueriesWhenFeatureDisabled() { /** * Test update top query record when feature enabled */ - public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException { + public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException { Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build(); logger.info("--> starting nodes for query insight testing"); @@ -121,7 +121,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertNotEquals(0, response.failures().size()); Assert.assertEquals( - "Cannot get query data when query insight feature is not enabled for MetricType [latency].", + "Cannot get top n queries when [search.top_n_queries.latency.enabled] is not enabled.", response.failures().get(0).getCause().getCause().getMessage() ); @@ -143,7 +143,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte /** * Test get top queries when feature enabled */ - public void testGetTopQueriesWhenFeatureEnabled() { + public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") @@ -174,7 +174,8 @@ public void testGetTopQueriesWhenFeatureEnabled() { .get(); assertEquals(searchResponse.getFailedShards(), 0); } - + // Sleep to wait for queue drained to top queries store + Thread.sleep(6000); TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertEquals(0, response.failures().size()); @@ -187,7 +188,7 @@ public void testGetTopQueriesWhenFeatureEnabled() { /** * Test get top queries with small top n size */ - public void testGetTopQueriesWithSmallTopN() { + public void testGetTopQueriesWithSmallTopN() throws InterruptedException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1") @@ -218,7 +219,7 @@ public void testGetTopQueriesWithSmallTopN() { .get(); assertEquals(searchResponse.getFailedShards(), 0); } - + Thread.sleep(6000); TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertEquals(0, response.failures().size()); @@ -231,7 +232,7 @@ public void testGetTopQueriesWithSmallTopN() { /** * Test get top queries with small window size */ - public void testGetTopQueriesWithSmallWindowSize() { + public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") @@ -267,7 +268,7 @@ public void testGetTopQueriesWithSmallWindowSize() { TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertEquals(0, response.failures().size()); Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); - + Thread.sleep(6000); internalCluster().stopAllNodes(); } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 3bc8215ec19e5..705273f52a567 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -19,7 +19,6 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.rules.model.Attribute; -import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; @@ -34,7 +33,9 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; /** - * The listener for top N queries by latency + * The listener for query insights services. + * It forwards query-related data to the appropriate query insights stores, + * either for each request or for each phase. * * @opensearch.internal */ @@ -52,47 +53,55 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener * @param queryInsightsService The topQueriesByLatencyService associated with this listener */ @Inject - public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) { + public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { this.queryInsightsService = queryInsightsService; clusterService.getClusterSettings() - .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v)); + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v)); clusterService.getClusterSettings() .addSettingsUpdateConsumer( TOP_N_LATENCY_QUERIES_SIZE, - this.queryInsightsService::setTopNSize, - this.queryInsightsService::validateTopNSize + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v) ); clusterService.getClusterSettings() .addSettingsUpdateConsumer( TOP_N_LATENCY_QUERIES_WINDOW_SIZE, - this.queryInsightsService::setWindowSize, - this.queryInsightsService::validateWindowSize + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v) ); - this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); - this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); - this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); + this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); } /** - * Enable or disable metric collection for {@link MetricType} + * Enable or disable top queries insights collection for {@link MetricType} + * This function will enable or disable the corresponding listeners + * and query insights services. * * @param metricType {@link MetricType} * @param enabled boolean */ - public void setEnabled(MetricType metricType, boolean enabled) { + public void setEnableTopQueries(final MetricType metricType, final boolean enabled) { + boolean isAllMetricsDisabled = !queryInsightsService.isEnabled(); this.queryInsightsService.enableCollection(metricType, enabled); - - // disable QueryInsightsListener only if collection for all metrics are disabled. if (!enabled) { - for (MetricType t : MetricType.allMetricTypes()) { - if (this.queryInsightsService.isCollectionEnabled(t)) { - return; - } + // disable QueryInsightsListener only if all metrics collections are disabled now. + if (!queryInsightsService.isEnabled()) { + super.setEnabled(false); + this.queryInsightsService.stop(); } - super.setEnabled(false); } else { super.setEnabled(true); + // restart QueryInsightsListener only if none of metrics collections is enabled before. + if (isAllMetricsDisabled) { + this.queryInsightsService.stop(); + this.queryInsightsService.start(); + } } + } @Override @@ -113,17 +122,14 @@ public void onPhaseFailure(SearchPhaseContext context) {} public void onRequestStart(SearchRequestContext searchRequestContext) {} @Override - public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - SearchRequest request = context.getRequest(); + public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + final SearchRequest request = context.getRequest(); try { - Map> measurements = new HashMap<>(); + Map measurements = new HashMap<>(); if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) { measurements.put( MetricType.LATENCY, - new Measurement<>( - MetricType.LATENCY.name(), - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) - ) + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) ); } Map attributes = new HashMap<>(); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java index 640a0b82260b5..26cff82aae52e 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java @@ -20,14 +20,13 @@ import java.util.List; /** - * Top Queries by resource usage / latency on a node - *

+ * Holds all top queries records by resource usage or latency on a node * Mainly used in the top N queries node response workflow. * * @opensearch.internal */ public class TopQueries extends BaseNodeResponse implements ToXContentObject { - /** The store to keep the top N queries records */ + /** The store to keep the top queries records */ private final List topQueriesRecords; /** @@ -35,7 +34,7 @@ public class TopQueries extends BaseNodeResponse implements ToXContentObject { * @param in A {@link StreamInput} object. * @throws IOException IOException */ - public TopQueries(StreamInput in) throws IOException { + public TopQueries(final StreamInput in) throws IOException { super(in); topQueriesRecords = in.readList(SearchQueryRecord::new); } @@ -45,13 +44,13 @@ public TopQueries(StreamInput in) throws IOException { * @param node A node that is part of the cluster. * @param searchQueryRecords A list of SearchQueryRecord associated in this TopQueries. */ - public TopQueries(DiscoveryNode node, List searchQueryRecords) { + public TopQueries(final DiscoveryNode node, final List searchQueryRecords) { super(node); topQueriesRecords = searchQueryRecords; } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { if (topQueriesRecords != null) { for (SearchQueryRecord record : topQueriesRecords) { record.toXContent(builder, params); @@ -61,7 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeList(topQueriesRecords); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java index 27177fef25bea..3bdff2c403161 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java @@ -9,7 +9,6 @@ package org.opensearch.plugin.insights.rules.action.top_queries; import org.opensearch.action.support.nodes.BaseNodesRequest; -import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.plugin.insights.rules.model.MetricType; @@ -21,10 +20,9 @@ * * @opensearch.internal */ -@PublicApi(since = "1.0.0") public class TopQueriesRequest extends BaseNodesRequest { - MetricType metricType; + final MetricType metricType; /** * Constructor for TopQueriesRequest @@ -32,13 +30,9 @@ public class TopQueriesRequest extends BaseNodesRequest { * @param in A {@link StreamInput} object. * @throws IOException if the stream cannot be deserialized. */ - public TopQueriesRequest(StreamInput in) throws IOException { + public TopQueriesRequest(final StreamInput in) throws IOException { super(in); - MetricType metricType = MetricType.readFromStream(in); - if (false == MetricType.allMetricTypes().contains(metricType)) { - throw new IllegalStateException("Invalid metric used in top queries request: " + metricType); - } - this.metricType = metricType; + this.metricType = MetricType.readFromStream(in); } /** @@ -48,7 +42,7 @@ public TopQueriesRequest(StreamInput in) throws IOException { * @param metricType {@link MetricType} * @param nodesIds the nodeIds specified in the request */ - public TopQueriesRequest(MetricType metricType, String... nodesIds) { + public TopQueriesRequest(final MetricType metricType, final String... nodesIds) { super(nodesIds); this.metricType = metricType; } @@ -61,7 +55,7 @@ public MetricType getMetricType() { } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeString(metricType.toString()); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java index fe7644de5629f..2e66bb7f77baf 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java @@ -11,7 +11,6 @@ import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.ClusterName; -import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -31,7 +30,6 @@ * * @opensearch.internal */ -@PublicApi(since = "1.0.0") public class TopQueriesResponse extends BaseNodesResponse implements ToXContentFragment { private static final String CLUSTER_LEVEL_RESULTS_KEY = "top_queries"; @@ -44,7 +42,7 @@ public class TopQueriesResponse extends BaseNodesResponse implements * @param in A {@link StreamInput} object. * @throws IOException if the stream cannot be deserialized. */ - public TopQueriesResponse(StreamInput in) throws IOException { + public TopQueriesResponse(final StreamInput in) throws IOException { super(in); top_n_size = in.readInt(); metricType = in.readEnum(MetricType.class); @@ -60,11 +58,11 @@ public TopQueriesResponse(StreamInput in) throws IOException { * @param metricType the {@link MetricType} to be returned in this response */ public TopQueriesResponse( - ClusterName clusterName, - List nodes, - List failures, - int top_n_size, - MetricType metricType + final ClusterName clusterName, + final List nodes, + final List failures, + final int top_n_size, + final MetricType metricType ) { super(clusterName, nodes, failures); this.top_n_size = top_n_size; @@ -72,20 +70,20 @@ public TopQueriesResponse( } @Override - protected List readNodesFrom(StreamInput in) throws IOException { + protected List readNodesFrom(final StreamInput in) throws IOException { return in.readList(TopQueries::new); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + protected void writeNodesTo(final StreamOutput out, final List nodes) throws IOException { out.writeList(nodes); out.writeLong(top_n_size); out.writeEnum(metricType); } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - List results = getNodes(); + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + final List results = getNodes(); postProcess(results); builder.startObject(); toClusterLevelResult(builder, params, results); @@ -95,7 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public String toString() { try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + final XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); builder.startObject(); this.toXContent(builder, EMPTY_PARAMS); builder.endObject(); @@ -110,9 +108,9 @@ public String toString() { * * @param results the top queries results */ - private void postProcess(List results) { + private void postProcess(final List results) { for (TopQueries topQueries : results) { - String nodeId = topQueries.getNode().getId(); + final String nodeId = topQueries.getNode().getId(); for (SearchQueryRecord record : topQueries.getTopQueriesRecord()) { record.addAttribute(Attribute.NODE_ID, nodeId); } @@ -127,8 +125,9 @@ private void postProcess(List results) { * @param results top queries results from all nodes * @throws IOException if an error occurs */ - private void toClusterLevelResult(XContentBuilder builder, Params params, List results) throws IOException { - List all_records = results.stream() + private void toClusterLevelResult(final XContentBuilder builder, final Params params, final List results) + throws IOException { + final List all_records = results.stream() .map(TopQueries::getTopQueriesRecord) .flatMap(Collection::stream) .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java index 654aab6eb1563..6aa511c626ab1 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java @@ -33,7 +33,7 @@ import static org.opensearch.rest.RestRequest.Method.GET; /** - * Transport action to get Top N queries by certain metric type + * Rest action to get Top N queries by certain metric type * * @opensearch.api */ @@ -64,17 +64,12 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final TopQueriesRequest topQueriesRequest = prepareRequest(request); topQueriesRequest.timeout(request.param("timeout")); - return channel -> client.execute( - TopQueriesAction.INSTANCE, - topQueriesRequest, - topQueriesResponse(channel) - - ); + return channel -> client.execute(TopQueriesAction.INSTANCE, topQueriesRequest, topQueriesResponse(channel)); } static TopQueriesRequest prepareRequest(final RestRequest request) { - String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); - String metricType = request.param("type", MetricType.LATENCY.toString()); + final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + final String metricType = request.param("type", MetricType.LATENCY.toString()); if (!ALLOWED_METRICS.contains(metricType)) { throw new IllegalArgumentException( String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType) @@ -93,10 +88,10 @@ public boolean canTripCircuitBreaker() { return false; } - private RestResponseListener topQueriesResponse(RestChannel channel) { + private RestResponseListener topQueriesResponse(final RestChannel channel) { return new RestResponseListener<>(channel) { @Override - public RestResponse buildResponse(TopQueriesResponse response) throws Exception { + public RestResponse buildResponse(final TopQueriesResponse response) throws Exception { return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); } }; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java index a4ddaca0a6cdc..ddf614211bc41 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java @@ -55,11 +55,11 @@ public class TransportTopQueriesAction extends TransportNodesAction< */ @Inject public TransportTopQueriesAction( - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - QueryInsightsService queryInsightsService, - ActionFilters actionFilters + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final QueryInsightsService queryInsightsService, + final ActionFilters actionFilters ) { super( TopQueriesAction.NAME, @@ -77,9 +77,9 @@ public TransportTopQueriesAction( @Override protected TopQueriesResponse newResponse( - TopQueriesRequest topQueriesRequest, - List responses, - List failures + final TopQueriesRequest topQueriesRequest, + final List responses, + final List failures ) { if (topQueriesRequest.getMetricType() == MetricType.LATENCY) { return new TopQueriesResponse( @@ -95,20 +95,23 @@ protected TopQueriesResponse newResponse( } @Override - protected NodeRequest newNodeRequest(TopQueriesRequest request) { + protected NodeRequest newNodeRequest(final TopQueriesRequest request) { return new NodeRequest(request); } @Override - protected TopQueries newNodeResponse(StreamInput in) throws IOException { + protected TopQueries newNodeResponse(final StreamInput in) throws IOException { return new TopQueries(in); } @Override - protected TopQueries nodeOperation(NodeRequest nodeRequest) { - TopQueriesRequest request = nodeRequest.request; + protected TopQueries nodeOperation(final NodeRequest nodeRequest) { + final TopQueriesRequest request = nodeRequest.request; if (request.getMetricType() == MetricType.LATENCY) { - return new TopQueries(clusterService.localNode(), queryInsightsService.getTopNRecords(MetricType.LATENCY, true)); + return new TopQueries( + clusterService.localNode(), + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(true) + ); } else { throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", request.getMetricType())); } @@ -122,10 +125,11 @@ protected TopQueries nodeOperation(NodeRequest nodeRequest) { */ public static class NodeRequest extends TransportRequest { - TopQueriesRequest request; + final TopQueriesRequest request; /** * Create the NodeResponse object from StreamInput + * * @param in the StreamInput to read the object * @throws IOException IOException */ @@ -138,12 +142,12 @@ public NodeRequest(StreamInput in) throws IOException { * Create the NodeResponse object from a TopQueriesRequest * @param request the TopQueriesRequest object */ - public NodeRequest(TopQueriesRequest request) { + public NodeRequest(final TopQueriesRequest request) { this.request = request; } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); request.writeTo(out); } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index a6368bb0e180f..f340950017a5c 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -16,6 +16,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -45,6 +46,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); + private final TopQueriesService topQueriesService = mock(TopQueriesService.class); private ClusterService clusterService; @Before @@ -57,9 +59,10 @@ public void setup() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); clusterService = new ClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); } - public void testOnRequestEnd() { + public void testOnRequestEnd() throws InterruptedException { Long timestamp = System.currentTimeMillis() - 100L; SearchType searchType = SearchType.QUERY_THEN_FETCH; @@ -146,13 +149,13 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { public void testSetEnabled() { when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); - queryInsightsListener.setEnabled(MetricType.LATENCY, true); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); assertTrue(queryInsightsListener.isEnabled()); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false); when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false); when(queryInsightsService.isCollectionEnabled(MetricType.JVM)).thenReturn(false); - queryInsightsListener.setEnabled(MetricType.LATENCY, false); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false); assertFalse(queryInsightsListener.isEnabled()); } }