Skip to content

Commit

Permalink
refactor service for improving multithreading efficiency
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Feb 6, 2024
1 parent c4fb7ae commit 409fc92
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ public void testGetTopQueriesWhenFeatureDisabled() {
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
"Cannot get top n queries when [search.top_n_queries.latency.enabled] is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);
}

/**
* Test update top query record when feature enabled
*/
public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException {
public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();

logger.info("--> starting nodes for query insight testing");
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
"Cannot get top n queries when [search.top_n_queries.latency.enabled] is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);

Expand All @@ -143,7 +143,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
/**
* Test get top queries when feature enabled
*/
public void testGetTopQueriesWhenFeatureEnabled() {
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand Down Expand Up @@ -174,7 +174,8 @@ public void testGetTopQueriesWhenFeatureEnabled() {
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

// Sleep to wait for queue drained to top queries store
Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Expand All @@ -187,7 +188,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
/**
* Test get top queries with small top n size
*/
public void testGetTopQueriesWithSmallTopN() {
public void testGetTopQueriesWithSmallTopN() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1")
Expand Down Expand Up @@ -218,7 +219,7 @@ public void testGetTopQueriesWithSmallTopN() {
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Expand All @@ -231,7 +232,7 @@ public void testGetTopQueriesWithSmallTopN() {
/**
* Test get top queries with small window size
*/
public void testGetTopQueriesWithSmallWindowSize() {
public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand Down Expand Up @@ -267,7 +268,7 @@ public void testGetTopQueriesWithSmallWindowSize() {
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());

Thread.sleep(6000);
internalCluster().stopAllNodes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

Expand All @@ -34,7 +33,9 @@
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;

/**
* The listener for top N queries by latency
* The listener for query insights services.
* It forwards query-related data to the appropriate query insights stores,
* either for each request or for each phase.
*
* @opensearch.internal
*/
Expand All @@ -52,47 +53,55 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) {
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v));
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
this.queryInsightsService::setTopNSize,
this.queryInsightsService::validateTopNSize
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
this.queryInsightsService::setWindowSize,
this.queryInsightsService::validateWindowSize
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v)
);
this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
}

/**
* Enable or disable metric collection for {@link MetricType}
* Enable or disable top queries insights collection for {@link MetricType}
* This function will enable or disable the corresponding listeners
* and query insights services.
*
* @param metricType {@link MetricType}
* @param enabled boolean
*/
public void setEnabled(MetricType metricType, boolean enabled) {
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
boolean isAllMetricsDisabled = !queryInsightsService.isEnabled();
this.queryInsightsService.enableCollection(metricType, enabled);

// disable QueryInsightsListener only if collection for all metrics are disabled.
if (!enabled) {
for (MetricType t : MetricType.allMetricTypes()) {
if (this.queryInsightsService.isCollectionEnabled(t)) {
return;
}
// disable QueryInsightsListener only if all metrics collections are disabled now.
if (!queryInsightsService.isEnabled()) {
super.setEnabled(false);
this.queryInsightsService.stop();
}
super.setEnabled(false);
} else {
super.setEnabled(true);
// restart QueryInsightsListener only if none of metrics collections is enabled before.
if (isAllMetricsDisabled) {
this.queryInsightsService.stop();
this.queryInsightsService.start();
}
}

}

@Override
Expand All @@ -113,17 +122,14 @@ public void onPhaseFailure(SearchPhaseContext context) {}
public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
SearchRequest request = context.getRequest();
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
final SearchRequest request = context.getRequest();
try {
Map<MetricType, Measurement<? extends Number>> measurements = new HashMap<>();
Map<MetricType, Number> measurements = new HashMap<>();
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
new Measurement<>(
MetricType.LATENCY.name(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
)
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
);
}
Map<Attribute, Object> attributes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@
import java.util.List;

/**
* Top Queries by resource usage / latency on a node
* <p>
* Holds all top queries records by resource usage or latency on a node
* Mainly used in the top N queries node response workflow.
*
* @opensearch.internal
*/
public class TopQueries extends BaseNodeResponse implements ToXContentObject {
/** The store to keep the top N queries records */
/** The store to keep the top queries records */
private final List<SearchQueryRecord> topQueriesRecords;

/**
* Create the TopQueries Object from StreamInput
* @param in A {@link StreamInput} object.
* @throws IOException IOException
*/
public TopQueries(StreamInput in) throws IOException {
public TopQueries(final StreamInput in) throws IOException {
super(in);
topQueriesRecords = in.readList(SearchQueryRecord::new);
}
Expand All @@ -45,13 +44,13 @@ public TopQueries(StreamInput in) throws IOException {
* @param node A node that is part of the cluster.
* @param searchQueryRecords A list of SearchQueryRecord associated in this TopQueries.
*/
public TopQueries(DiscoveryNode node, List<SearchQueryRecord> searchQueryRecords) {
public TopQueries(final DiscoveryNode node, final List<SearchQueryRecord> searchQueryRecords) {
super(node);
topQueriesRecords = searchQueryRecords;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
if (topQueriesRecords != null) {
for (SearchQueryRecord record : topQueriesRecords) {
record.toXContent(builder, params);
Expand All @@ -61,7 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

@Override
public void writeTo(StreamOutput out) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(topQueriesRecords);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.plugin.insights.rules.action.top_queries;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.plugin.insights.rules.model.MetricType;
Expand All @@ -21,24 +20,19 @@
*
* @opensearch.internal
*/
@PublicApi(since = "1.0.0")
public class TopQueriesRequest extends BaseNodesRequest<TopQueriesRequest> {

MetricType metricType;
final MetricType metricType;

/**
* Constructor for TopQueriesRequest
*
* @param in A {@link StreamInput} object.
* @throws IOException if the stream cannot be deserialized.
*/
public TopQueriesRequest(StreamInput in) throws IOException {
public TopQueriesRequest(final StreamInput in) throws IOException {
super(in);
MetricType metricType = MetricType.readFromStream(in);
if (false == MetricType.allMetricTypes().contains(metricType)) {
throw new IllegalStateException("Invalid metric used in top queries request: " + metricType);
}
this.metricType = metricType;
this.metricType = MetricType.readFromStream(in);
}

/**
Expand All @@ -48,7 +42,7 @@ public TopQueriesRequest(StreamInput in) throws IOException {
* @param metricType {@link MetricType}
* @param nodesIds the nodeIds specified in the request
*/
public TopQueriesRequest(MetricType metricType, String... nodesIds) {
public TopQueriesRequest(final MetricType metricType, final String... nodesIds) {
super(nodesIds);
this.metricType = metricType;
}
Expand All @@ -61,7 +55,7 @@ public MetricType getMetricType() {
}

@Override
public void writeTo(StreamOutput out) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(metricType.toString());
}
Expand Down
Loading

0 comments on commit 409fc92

Please sign in to comment.