Skip to content

Commit

Permalink
Refactor record and service to make them generic
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Feb 6, 2024
1 parent ae9e336 commit c4fb7ae
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 935 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968))
- Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508))
- [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903))
- [Query Insights] Implement Top N Queries Feature in Query Insights Framework([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904))
- [Query Insights] Implement Top N Queries feature to collect and gather information about high latency queries in a window ([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -28,6 +30,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -74,15 +77,69 @@ public void testQueryInsightPluginInstalled() {
* Test get top queries when feature disabled
*/
public void testGetTopQueriesWhenFeatureDisabled() {
TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled.",
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
response.failures().get(0).getCause().getCause().getMessage()
);
}

/**
* Test update top query record when feature enabled
*/
public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();

logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertFalse(health.isTimedOut());

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2))
);
ensureStableCluster(2);
logger.info("--> creating indices for query insight testing");
for (int i = 0; i < 5; i++) {
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
assertEquals("CREATED", response.status().toString());
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
response.failures().get(0).getCause().getCause().getMessage()
);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true").build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
TopQueriesRequest request2 = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response2 = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request2).actionGet();
Assert.assertEquals(0, response2.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response2.getNodes().size());
for (int i = 0; i < TOTAL_NUMBER_OF_NODES; i++) {
Assert.assertEquals(0, response2.getNodes().get(i).getTopQueriesRecord().size());
}

internalCluster().stopAllNodes();
}

/**
* Test get top queries when feature enabled
*/
Expand All @@ -93,7 +150,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -118,11 +175,11 @@ public void testGetTopQueriesWhenFeatureEnabled() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}
Expand All @@ -137,7 +194,7 @@ public void testGetTopQueriesWithSmallTopN() {
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -162,12 +219,11 @@ public void testGetTopQueriesWithSmallTopN() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
// TODO: this should be 1 after changing to cluster level top N.
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}
Expand All @@ -179,10 +235,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "0ms")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -207,11 +263,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(0, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());

internalCluster().stopAllNodes();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.listener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;

/**
* The listener for top N queries by latency
*
* @opensearch.internal
*/
public final class QueryInsightsListener extends SearchRequestOperationsListener {
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

private static final Logger log = LogManager.getLogger(QueryInsightsListener.class);

private final QueryInsightsService queryInsightsService;

/**
* Constructor for QueryInsightsListener
*
* @param clusterService The Node's cluster service.
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) {
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
this.queryInsightsService::setTopNSize,
this.queryInsightsService::validateTopNSize
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
this.queryInsightsService::setWindowSize,
this.queryInsightsService::validateWindowSize
);
this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
}

/**
* Enable or disable metric collection for {@link MetricType}
*
* @param metricType {@link MetricType}
* @param enabled boolean
*/
public void setEnabled(MetricType metricType, boolean enabled) {
this.queryInsightsService.enableCollection(metricType, enabled);

// disable QueryInsightsListener only if collection for all metrics are disabled.
if (!enabled) {
for (MetricType t : MetricType.allMetricTypes()) {
if (this.queryInsightsService.isCollectionEnabled(t)) {
return;
}
}
super.setEnabled(false);
} else {
super.setEnabled(true);
}
}

@Override
public boolean isEnabled() {
return super.isEnabled();
}

@Override
public void onPhaseStart(SearchPhaseContext context) {}

@Override
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
SearchRequest request = context.getRequest();
try {
Map<MetricType, Measurement<? extends Number>> measurements = new HashMap<>();
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
new Measurement<>(
MetricType.LATENCY.name(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
)
);
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
}
}
}
Loading

0 comments on commit c4fb7ae

Please sign in to comment.