forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Query insights plugin implementation (opensearch-project#11903)
* Query insights plugin implementation Signed-off-by: Chenyang Ji <[email protected]> * Increase JavaDoc coverage and update PR based comments Signed-off-by: Chenyang Ji <[email protected]> * Refactor record and service to make them generic Signed-off-by: Chenyang Ji <[email protected]> * refactor service for improving multithreading efficiency Signed-off-by: Chenyang Ji <[email protected]> --------- Signed-off-by: Chenyang Ji <[email protected]>
- Loading branch information
Showing
24 changed files
with
1,622 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
opensearchplugin { | ||
description 'OpenSearch Query Insights Plugin.' | ||
classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' | ||
} | ||
|
||
dependencies { | ||
} |
112 changes: 112 additions & 0 deletions
112
plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights; | ||
|
||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.opensearch.cluster.node.DiscoveryNodes; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.IndexScopedSettings; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.settings.SettingsFilter; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.common.util.concurrent.OpenSearchExecutors; | ||
import org.opensearch.core.action.ActionResponse; | ||
import org.opensearch.core.common.io.stream.NamedWriteableRegistry; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.env.Environment; | ||
import org.opensearch.env.NodeEnvironment; | ||
import org.opensearch.plugin.insights.core.service.QueryInsightsService; | ||
import org.opensearch.plugin.insights.settings.QueryInsightsSettings; | ||
import org.opensearch.plugins.ActionPlugin; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.rest.RestController; | ||
import org.opensearch.rest.RestHandler; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.threadpool.ExecutorBuilder; | ||
import org.opensearch.threadpool.ScalingExecutorBuilder; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.watcher.ResourceWatcherService; | ||
|
||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Plugin class for Query Insights. | ||
*/ | ||
public class QueryInsightsPlugin extends Plugin implements ActionPlugin { | ||
/** | ||
* Default constructor | ||
*/ | ||
public QueryInsightsPlugin() {} | ||
|
||
@Override | ||
public Collection<Object> createComponents( | ||
final Client client, | ||
final ClusterService clusterService, | ||
final ThreadPool threadPool, | ||
final ResourceWatcherService resourceWatcherService, | ||
final ScriptService scriptService, | ||
final NamedXContentRegistry xContentRegistry, | ||
final Environment environment, | ||
final NodeEnvironment nodeEnvironment, | ||
final NamedWriteableRegistry namedWriteableRegistry, | ||
final IndexNameExpressionResolver indexNameExpressionResolver, | ||
final Supplier<RepositoriesService> repositoriesServiceSupplier | ||
) { | ||
// create top n queries service | ||
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); | ||
return List.of(queryInsightsService); | ||
} | ||
|
||
@Override | ||
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) { | ||
return List.of( | ||
new ScalingExecutorBuilder( | ||
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, | ||
1, | ||
Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT), | ||
TimeValue.timeValueMinutes(5) | ||
) | ||
); | ||
} | ||
|
||
@Override | ||
public List<RestHandler> getRestHandlers( | ||
final Settings settings, | ||
final RestController restController, | ||
final ClusterSettings clusterSettings, | ||
final IndexScopedSettings indexScopedSettings, | ||
final SettingsFilter settingsFilter, | ||
final IndexNameExpressionResolver indexNameExpressionResolver, | ||
final Supplier<DiscoveryNodes> nodesInCluster | ||
) { | ||
return List.of(); | ||
} | ||
|
||
@Override | ||
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() { | ||
return List.of(); | ||
} | ||
|
||
@Override | ||
public List<Setting<?>> getSettings() { | ||
return List.of( | ||
// Settings for top N queries | ||
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, | ||
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, | ||
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE | ||
); | ||
} | ||
} |
180 changes: 180 additions & 0 deletions
180
...ights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.service; | ||
|
||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.common.lifecycle.AbstractLifecycleComponent; | ||
import org.opensearch.plugin.insights.rules.model.MetricType; | ||
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; | ||
import org.opensearch.plugin.insights.settings.QueryInsightsSettings; | ||
import org.opensearch.threadpool.Scheduler; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
|
||
/** | ||
* Service responsible for gathering, analyzing, storing and exporting | ||
* information related to search queries | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class QueryInsightsService extends AbstractLifecycleComponent { | ||
/** | ||
* The internal OpenSearch thread pool that execute async processing and exporting tasks | ||
*/ | ||
private final ThreadPool threadPool; | ||
|
||
/** | ||
* Services to capture top n queries for different metric types | ||
*/ | ||
private final Map<MetricType, TopQueriesService> topQueriesServices; | ||
|
||
/** | ||
* Flags for enabling insight data collection for different metric types | ||
*/ | ||
private final Map<MetricType, Boolean> enableCollect; | ||
|
||
/** | ||
* The internal thread-safe queue to ingest the search query data and subsequently forward to processors | ||
*/ | ||
private final LinkedBlockingQueue<SearchQueryRecord> queryRecordsQueue; | ||
|
||
/** | ||
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when | ||
* the service closed concurrently. | ||
*/ | ||
protected volatile Scheduler.Cancellable scheduledFuture; | ||
|
||
/** | ||
* Constructor of the QueryInsightsService | ||
* | ||
* @param threadPool The OpenSearch thread pool to run async tasks | ||
*/ | ||
@Inject | ||
public QueryInsightsService(final ThreadPool threadPool) { | ||
enableCollect = new HashMap<>(); | ||
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); | ||
topQueriesServices = new HashMap<>(); | ||
for (MetricType metricType : MetricType.allMetricTypes()) { | ||
enableCollect.put(metricType, false); | ||
topQueriesServices.put(metricType, new TopQueriesService(metricType)); | ||
} | ||
this.threadPool = threadPool; | ||
} | ||
|
||
/** | ||
* Ingest the query data into in-memory stores | ||
* | ||
* @param record the record to ingest | ||
*/ | ||
public boolean addRecord(final SearchQueryRecord record) { | ||
boolean shouldAdd = false; | ||
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) { | ||
if (!enableCollect.get(entry.getKey())) { | ||
continue; | ||
} | ||
List<SearchQueryRecord> currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot(); | ||
// skip add to top N queries store if the incoming record is smaller than the Nth record | ||
if (currentSnapshot.size() < entry.getValue().getTopNSize() | ||
|| SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) { | ||
shouldAdd = true; | ||
break; | ||
} | ||
} | ||
if (shouldAdd) { | ||
return queryRecordsQueue.offer(record); | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* Drain the queryRecordsQueue into internal stores and services | ||
*/ | ||
public void drainRecords() { | ||
final List<SearchQueryRecord> records = new ArrayList<>(); | ||
queryRecordsQueue.drainTo(records); | ||
records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); | ||
for (MetricType metricType : MetricType.allMetricTypes()) { | ||
if (enableCollect.get(metricType)) { | ||
// ingest the records into topQueriesService | ||
topQueriesServices.get(metricType).consumeRecords(records); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Get the top queries service based on metricType | ||
* @param metricType {@link MetricType} | ||
* @return {@link TopQueriesService} | ||
*/ | ||
public TopQueriesService getTopQueriesService(final MetricType metricType) { | ||
return topQueriesServices.get(metricType); | ||
} | ||
|
||
/** | ||
* Set flag to enable or disable Query Insights data collection | ||
* | ||
* @param metricType {@link MetricType} | ||
* @param enable Flag to enable or disable Query Insights data collection | ||
*/ | ||
public void enableCollection(final MetricType metricType, final boolean enable) { | ||
this.enableCollect.put(metricType, enable); | ||
this.topQueriesServices.get(metricType).setEnabled(enable); | ||
} | ||
|
||
/** | ||
* Get if the Query Insights data collection is enabled for a MetricType | ||
* | ||
* @param metricType {@link MetricType} | ||
* @return if the Query Insights data collection is enabled | ||
*/ | ||
public boolean isCollectionEnabled(final MetricType metricType) { | ||
return this.enableCollect.get(metricType); | ||
} | ||
|
||
/** | ||
* Check if query insights service is enabled | ||
* | ||
* @return if query insights service is enabled | ||
*/ | ||
public boolean isEnabled() { | ||
for (MetricType t : MetricType.allMetricTypes()) { | ||
if (isCollectionEnabled(t)) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
@Override | ||
protected void doStart() { | ||
if (isEnabled()) { | ||
scheduledFuture = threadPool.scheduleWithFixedDelay( | ||
this::drainRecords, | ||
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, | ||
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR | ||
); | ||
} | ||
} | ||
|
||
@Override | ||
protected void doStop() { | ||
if (scheduledFuture != null) { | ||
scheduledFuture.cancel(); | ||
} | ||
} | ||
|
||
@Override | ||
protected void doClose() {} | ||
} |
Oops, something went wrong.