Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move query categorization changes to query insights plugin #14528

Closed
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make the class CommunityIdProcessor final ([#14448](https://github.com/opensearch-project/OpenSearch/pull/14448))
- Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575))
- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597))
- Move query categorization changes to query insights plugin ([#14528](https://github.com/opensearch-project/OpenSearch/pull/14528))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction;
import org.opensearch.plugin.insights.settings.QueryCategorizationSettings;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -49,7 +53,7 @@
/**
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin {
public class QueryInsightsPlugin extends Plugin implements ActionPlugin, TelemetryAwarePlugin {
/**
* Default constructor
*/
Expand All @@ -67,10 +71,17 @@ public Collection<Object> createComponents(
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
final Supplier<RepositoriesService> repositoriesServiceSupplier,
final Tracer tracer,
final MetricsRegistry metricsRegistry
) {
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
final QueryInsightsService queryInsightsService = new QueryInsightsService(
clusterService.getClusterSettings(),
threadPool,
client,
metricsRegistry
);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

Expand Down Expand Up @@ -119,7 +130,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns
* @param enabled boolean
*/
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
boolean isAllMetricsDisabled = !queryInsightsService.isEnabled();
boolean isAllMetricsDisabled = !queryInsightsService.isTopNEnabledForAnyMetric();
boolean isQueryMetricsDisabled = !queryInsightsService.isSearchQueryMetricsEnabled();
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
this.queryInsightsService.enableCollection(metricType, enabled);

if (!enabled) {
// disable QueryInsightsListener only if all metrics collections are disabled now.
if (!queryInsightsService.isEnabled()) {
// disable QueryInsightsListener only if all metrics collections are disabled now
// and search query metrics is disabled.
if (!queryInsightsService.isTopNEnabledForAnyMetric() && isQueryMetricsDisabled) {
super.setEnabled(false);
this.queryInsightsService.stop();
}
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -178,7 +181,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

package org.opensearch.plugin.insights.core.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -29,6 +33,7 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

/**
Expand All @@ -38,6 +43,9 @@
* @opensearch.internal
*/
public class QueryInsightsService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);

/**
* The internal OpenSearch thread pool that execute async processing and exporting tasks
*/
Expand Down Expand Up @@ -69,15 +77,27 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

private volatile boolean searchQueryMetricsEnabled;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a volatile boolean? Does the eventual consistency not work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If searchQueryMetricsEnabled is not declared as volatile, changes made by one thread to searchQueryMetricsEnabled may not be immediately visible to other threads and could cause some issues.


private SearchQueryCategorizer searchQueryCategorizer;

private MetricsRegistry metricsRegistry;

/**
* Constructor of the QueryInsightsService
*
* @param clusterSettings OpenSearch cluster level settings
* @param threadPool The OpenSearch thread pool to run async tasks
* @param client OS client
* @param metricsRegistry Opentelemetry Metrics registry
*/
@Inject
public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) {
public QueryInsightsService(
final ClusterSettings clusterSettings,
final ThreadPool threadPool,
final Client client,
final MetricsRegistry metricsRegistry
) {
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
Expand All @@ -95,6 +115,11 @@ public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadP
(settings -> validateExporterConfig(type, settings))
);
}

this.searchQueryMetricsEnabled = clusterSettings.get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
clusterSettings.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);

this.metricsRegistry = metricsRegistry;
}

/**
Expand Down Expand Up @@ -135,6 +160,14 @@ public void drainRecords() {
topQueriesServices.get(metricType).consumeRecords(records);
}
}

if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.consumeRecords(records);
} catch (Exception e) {
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Error while trying to categorize the queries.", e);
}
}
}

/**
Expand Down Expand Up @@ -168,11 +201,11 @@ public boolean isCollectionEnabled(final MetricType metricType) {
}

/**
* Check if query insights service is enabled
* Check if Top N feature is enabled for query insights service for any metric type
*
* @return if query insights service is enabled
*/
public boolean isEnabled() {
public boolean isTopNEnabledForAnyMetric() {
for (MetricType t : MetricType.allMetricTypes()) {
if (isCollectionEnabled(t)) {
return true;
Expand Down Expand Up @@ -241,6 +274,33 @@ public void setExporter(final MetricType type, final Settings settings) {
}
}

/**
* Set search query metrics enabled to enable collection of search query categorization metrics
* @param searchQueryMetricsEnabled boolean flag
*/
public void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
this.searchQueryMetricsEnabled = searchQueryMetricsEnabled;
if ((this.searchQueryMetricsEnabled == true) && this.searchQueryCategorizer == null) {
this.searchQueryCategorizer = new SearchQueryCategorizer(metricsRegistry);
}
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Is search query metrics feature enabled.
* @return boolean flag
*/
public boolean isSearchQueryMetricsEnabled() {
return this.searchQueryMetricsEnabled;
}

/**
* Get search query categorizer object
* @return SearchQueryCategorizer object
*/
public SearchQueryCategorizer getSearchQueryCategorizer() {
return this.searchQueryCategorizer;
}

/**
* Validate the exporter config for a metricType
*
Expand All @@ -255,7 +315,7 @@ public void validateExporterConfig(final MetricType type, final Settings setting

@Override
protected void doStart() {
if (isEnabled()) {
if (isTopNEnabledForAnyMetric()) {
scheduledFuture = threadPool.scheduleWithFixedDelay(
this::drainRecords,
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
* compatible open source license.
*/

package org.opensearch.index.query;
package org.opensearch.plugin.insights.core.service.categorizer;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we really have service in the namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had added categorizer in core earlier but @ansjcy mentioned that it might make more sense to keep it under service. I am okay either way.


import org.apache.lucene.search.BooleanClause;
import org.opensearch.common.SetOnce;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;

import java.util.ArrayList;
import java.util.EnumMap;
Expand Down Expand Up @@ -55,7 +57,11 @@ public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) {
return childVisitorWrapper;
}

String toJson() {
/**
* Convert query builder tree to json
* @return json query builder tree as a string
*/
public String toJson() {
StringBuilder outputBuilder = new StringBuilder("{\"type\":\"").append(queryType.get()).append("\"");
for (Map.Entry<BooleanClause.Occur, List<QueryShapeVisitor>> entry : childVisitors.entrySet()) {
outputBuilder.append(",\"").append(entry.getKey().name().toLowerCase(Locale.ROOT)).append("\"[");
Expand All @@ -73,6 +79,11 @@ String toJson() {
return outputBuilder.toString();
}

/**
* Pretty print the query builder tree
* @param indent indent size
* @return Query builder tree as a pretty string
*/
public String prettyPrintTree(String indent) {
StringBuilder outputBuilder = new StringBuilder(indent).append(queryType.get()).append("\n");
for (Map.Entry<BooleanClause.Occur, List<QueryShapeVisitor>> entry : childVisitors.entrySet()) {
Expand All @@ -83,4 +94,9 @@ public String prettyPrintTree(String indent) {
}
return outputBuilder.toString();
}

/**
* Default constructor
*/
public QueryShapeVisitor() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.action.search;
package org.opensearch.plugin.insights.core.service.categorizer;

import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
Expand All @@ -22,10 +22,18 @@ public class SearchQueryAggregationCategorizer {
private static final String TYPE_TAG = "type";
private final SearchQueryCounters searchQueryCounters;

/**
* Constructor for SearchQueryAggregationCategorizer
* @param searchQueryCounters Object containing all query counters
*/
public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters) {
this.searchQueryCounters = searchQueryCounters;
}

/**
* Increment aggregation related counters
* @param aggregatorFactories input aggregations
*/
public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilder> aggregatorFactories) {
for (AggregationBuilder aggregationBuilder : aggregatorFactories) {
incrementCountersRecursively(aggregationBuilder);
Expand All @@ -35,7 +43,7 @@ public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilde
private void incrementCountersRecursively(AggregationBuilder aggregationBuilder) {
// Increment counters for the current aggregation
String aggregationType = aggregationBuilder.getType();
searchQueryCounters.aggCounter.add(1, Tags.create().addTag(TYPE_TAG, aggregationType));
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, aggregationType));

// Recursively process sub-aggregations if any
Collection<AggregationBuilder> subAggregations = aggregationBuilder.getSubAggregations();
Expand All @@ -49,7 +57,7 @@ private void incrementCountersRecursively(AggregationBuilder aggregationBuilder)
Collection<PipelineAggregationBuilder> pipelineAggregations = aggregationBuilder.getPipelineAggregations();
for (PipelineAggregationBuilder pipelineAggregation : pipelineAggregations) {
String pipelineAggregationType = pipelineAggregation.getType();
searchQueryCounters.aggCounter.add(1, Tags.create().addTag(TYPE_TAG, pipelineAggregationType));
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, pipelineAggregationType));
}
}
}
Loading
Loading