Skip to content

Commit

Permalink
Move query categorization changes to query insights plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Siddhant Deshmukh <[email protected]>
  • Loading branch information
deshsidd committed Jun 24, 2024
1 parent 212efd7 commit 9e5a09a
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.plugin.insights;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand All @@ -33,10 +34,13 @@
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -49,7 +53,7 @@
/**
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin {
public class QueryInsightsPlugin extends Plugin implements ActionPlugin, TelemetryAwarePlugin {
/**
* Default constructor
*/
Expand All @@ -67,10 +71,12 @@ public Collection<Object> createComponents(
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
final Supplier<RepositoriesService> repositoriesServiceSupplier,
final Tracer tracer,
final MetricsRegistry metricsRegistry
) {
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client, metricsRegistry);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
* compatible open source license.
*/

package org.opensearch.index.query;
package org.opensearch.plugin.insights.core.categorizer;

import org.apache.lucene.search.BooleanClause;
import org.opensearch.common.SetOnce;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;

import java.util.ArrayList;
import java.util.EnumMap;
Expand Down Expand Up @@ -55,7 +57,7 @@ public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) {
return childVisitorWrapper;
}

String toJson() {
public String toJson() {
StringBuilder outputBuilder = new StringBuilder("{\"type\":\"").append(queryType.get()).append("\"");
for (Map.Entry<BooleanClause.Occur, List<QueryShapeVisitor>> entry : childVisitors.entrySet()) {
outputBuilder.append(",\"").append(entry.getKey().name().toLowerCase(Locale.ROOT)).append("\"[");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.categorizer;

import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.categorizer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;
import org.opensearch.index.query.QueryShapeVisitor;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortBuilder;
Expand All @@ -26,11 +27,11 @@
* Class to categorize the search queries based on the type and increment the relevant counters.
* Class also logs the query shape.
*/
final class SearchQueryCategorizer {
public final class SearchQueryCategorizer {

private static final Logger log = LogManager.getLogger(SearchQueryCategorizer.class);

final SearchQueryCounters searchQueryCounters;
public final SearchQueryCounters searchQueryCounters;

final SearchQueryAggregationCategorizer searchQueryAggregationCategorizer;

Expand All @@ -39,6 +40,13 @@ public SearchQueryCategorizer(MetricsRegistry metricsRegistry) {
searchQueryAggregationCategorizer = new SearchQueryAggregationCategorizer(searchQueryCounters);
}

public void consumeRecords(List<SearchQueryRecord> records) {
for (SearchQueryRecord record : records) {
SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE);
categorize(source);
}
}

public void categorize(SearchSourceBuilder source) {
QueryBuilder topLevelQueryBuilder = source.query();
logQueryShape(topLevelQueryBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.categorizer;

import org.apache.lucene.search.BooleanClause;
import org.opensearch.index.query.QueryBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.categorizer;

import org.opensearch.index.query.QueryBuilder;
import org.opensearch.telemetry.metrics.Counter;
Expand All @@ -20,7 +20,7 @@
/**
* Class contains all the Counters related to search query types.
*/
final class SearchQueryCounters {
public final class SearchQueryCounters {
private static final String LEVEL_TAG = "level";
private static final String UNIT = "1";
private final MetricsRegistry metricsRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

package org.opensearch.plugin.insights.core.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -29,6 +33,7 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

/**
Expand All @@ -38,6 +43,9 @@
* @opensearch.internal
*/
public class QueryInsightsService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);

/**
* The internal OpenSearch thread pool that execute async processing and exporting tasks
*/
Expand Down Expand Up @@ -69,6 +77,12 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;

private MetricsRegistry metricsRegistry;

/**
* Constructor of the QueryInsightsService
*
Expand All @@ -77,7 +91,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
* @param client OS client
*/
@Inject
public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) {
public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client, final MetricsRegistry metricsRegistry) {
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
Expand All @@ -95,6 +109,19 @@ public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadP
(settings -> validateExporterConfig(type, settings))
);
}

this.searchQueryMetricsEnabled = clusterSettings.get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
clusterSettings
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);

this.metricsRegistry = metricsRegistry;
}

private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
this.searchQueryMetricsEnabled = searchQueryMetricsEnabled;
if ((this.searchQueryMetricsEnabled == true) && this.searchQueryCategorizer == null) {
this.searchQueryCategorizer = new SearchQueryCategorizer(metricsRegistry);
}
}

/**
Expand Down Expand Up @@ -135,6 +162,15 @@ public void drainRecords() {
topQueriesServices.get(metricType).consumeRecords(records);
}
}

if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.consumeRecords(records);
} catch (Exception e) {
logger.error("Error while trying to categorize the queries.", e);
}
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.settings;

import org.opensearch.common.settings.Setting;

public class QueryCategorizationSettings {
public static final Setting<Boolean> SEARCH_QUERY_METRICS_ENABLED_SETTING = Setting.boolSetting(
"search.query.metrics.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.VersionUtils;

import java.io.IOException;
Expand Down Expand Up @@ -74,9 +75,13 @@ public static List<SearchQueryRecord> generateQueryInsightRecords(int lower, int
for (int j = 0; j < countOfPhases; ++j) {
phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong());
}

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(20); // Set the size parameter as needed

Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, "{\"size\":20}");
attributes.put(Attribute.SOURCE, searchSourceBuilder);
attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100));
attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10)));
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@
* compatible open source license.
*/

package org.opensearch.index.query;
package org.opensearch.plugin.insights.core.categorizor;

import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.ConstantScoreQueryBuilder;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.RegexpQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.test.OpenSearchTestCase;

import static org.junit.Assert.assertEquals;
import org.opensearch.plugin.insights.core.categorizer.QueryShapeVisitor;

public final class QueryShapeVisitorTests extends OpenSearchTestCase {
public void testQueryShapeVisitor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.categorizor;

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.BoostingQueryBuilder;
import org.opensearch.index.query.MatchNoneQueryBuilder;
Expand All @@ -20,6 +22,7 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.query.WildcardQueryBuilder;
import org.opensearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.opensearch.plugin.insights.core.categorizer.SearchQueryCategorizer;
import org.opensearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.opensearch.search.aggregations.bucket.terms.MultiTermsAggregationBuilder;
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig;
Expand All @@ -30,12 +33,9 @@
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;

import org.mockito.ArgumentCaptor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testOnRequestEnd() throws InterruptedException {
assertEquals(timestamp.longValue(), generatedRecord.getTimestamp());
assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS));
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
assertEquals(searchSourceBuilder, generatedRecord.getAttributes().get(Attribute.SOURCE));
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;
Expand All @@ -35,7 +37,7 @@ public void setup() {
Settings settings = settingsBuilder.build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings);
queryInsightsService = new QueryInsightsService(clusterSettings, threadPool, client);
queryInsightsService = new QueryInsightsService(clusterSettings, threadPool, client, NoopMetricsRegistry.INSTANCE);
queryInsightsService.enableCollection(MetricType.LATENCY, true);
queryInsightsService.enableCollection(MetricType.CPU, true);
queryInsightsService.enableCollection(MetricType.MEMORY, true);
Expand Down
Loading

0 comments on commit 9e5a09a

Please sign in to comment.