From 6aab36052055e24c2ef56454da9d3a1e4982ee6e Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 11 Jan 2024 12:45:43 -0800 Subject: [PATCH] Support dynamically adding SearchRequestOperationsListener (#11526) Along the way, also refactored TransportSearchAction.TimeProvider, so that it's no longer a (redundant) listener. --------- Signed-off-by: Chenyang Ji --- CHANGELOG.md | 1 + .../search/SearchWeightedRoutingIT.java | 2 +- .../search/stats/SearchStatsIT.java | 2 +- .../search/AbstractSearchAsyncAction.java | 4 +- .../action/search/SearchRequestContext.java | 20 +-- ...estOperationsCompositeListenerFactory.java | 81 +++++++++++ .../SearchRequestOperationsListener.java | 27 +++- .../action/search/SearchRequestSlowLog.java | 18 ++- .../action/search/SearchRequestStats.java | 14 +- .../action/search/SearchResponseMerger.java | 6 +- .../action/search/TransportSearchAction.java | 135 ++++-------------- .../common/settings/ClusterSettings.java | 5 +- .../main/java/org/opensearch/node/Node.java | 16 ++- .../cluster/node/stats/NodeStatsTests.java | 9 +- .../AbstractSearchAsyncActionTests.java | 24 +++- .../CanMatchPreFilterSearchPhaseTests.java | 31 +++- .../action/search/SearchAsyncActionTests.java | 12 +- .../SearchQueryThenFetchAsyncActionTests.java | 7 +- ...erationsCompositeListenerFactoryTests.java | 131 +++++++++++++++++ ...earchRequestOperationsListenerSupport.java | 12 +- .../search/SearchRequestSlowLogTests.java | 24 +++- .../search/SearchRequestStatsTests.java | 35 ++++- .../search/SearchResponseMergerTests.java | 97 +++++++++++-- .../search/SearchTimeProviderTests.java | 54 ------- .../search/TransportSearchActionTests.java | 37 ++++- .../index/search/stats/SearchStatsTests.java | 5 +- .../indices/NodeIndicesStatsTests.java | 5 +- .../snapshots/SnapshotResiliencyTests.java | 9 +- 28 files changed, 577 insertions(+), 246 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java delete mode 100644 server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b638b63dd52a0..1f64dcf82bd4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -193,6 +193,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678)) - Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582)) - Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732)) +- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index aa1fe695ecc12..d1e66c19c28e2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -57,7 +57,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY; +import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY; import static org.opensearch.search.aggregations.AggregationBuilders.terms; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 253a8b2b14824..8fb3c57dd7680 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -64,7 +64,7 @@ import java.util.Set; import java.util.function.Function; -import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY; +import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index f18bbb8a1cc13..5b41c2a13b596 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -214,7 +214,7 @@ public final void start() { 0, 0, buildTookInMillis(), - timeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), ShardSearchFailure.EMPTY_ARRAY, clusters, null @@ -670,7 +670,7 @@ protected final SearchResponse buildSearchResponse( successfulOps.get(), skippedOps.get(), buildTookInMillis(), - timeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), failures, clusters, searchContextId diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index 674363600b251..eceac7204b196 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -8,13 +8,11 @@ package org.opensearch.action.search; -import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.InternalApi; import java.util.EnumMap; import java.util.HashMap; -import java.util.List; import java.util.Locale; import java.util.Map; @@ -31,18 +29,14 @@ class SearchRequestContext { private TotalHits totalHits; private final EnumMap shardStats; - /** - * This constructor is for testing only - */ - SearchRequestContext() { - this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger())); - } + private final SearchRequest searchRequest; - SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { + SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) { this.searchRequestOperationsListener = searchRequestOperationsListener; this.absoluteStartNanos = System.nanoTime(); this.phaseTookMap = new HashMap<>(); this.shardStats = new EnumMap<>(ShardStatsFieldNames.class); + this.searchRequest = searchRequest; } SearchRequestOperationsListener getSearchRequestOperationsListener() { @@ -57,6 +51,14 @@ Map phaseTookMap() { return phaseTookMap; } + SearchResponse.PhaseTook getPhaseTook() { + if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) { + return new SearchResponse.PhaseTook(phaseTookMap); + } else { + return null; + } + } + /** * Override absoluteStartNanos set in constructor. * For testing only diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java new file mode 100644 index 0000000000000..db487bf945889 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * SearchRequestOperationsCompositeListenerFactory contains listeners registered to search requests, + * and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener} + * with the all listeners enabled at cluster-level and request-level. + * + * + * @opensearch.internal + */ +public final class SearchRequestOperationsCompositeListenerFactory { + private final List searchRequestListenersList; + + /** + * Create the SearchRequestOperationsCompositeListenerFactory and add multiple {@link SearchRequestOperationsListener} + * to the searchRequestListenersList. + * Those enabled listeners will be executed during each search request. + * + * @param listeners Multiple SearchRequestOperationsListener object to add. + * @throws IllegalArgumentException if any input listener is null. + */ + public SearchRequestOperationsCompositeListenerFactory(final SearchRequestOperationsListener... listeners) { + searchRequestListenersList = new ArrayList<>(); + for (SearchRequestOperationsListener listener : listeners) { + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + searchRequestListenersList.add(listener); + } + } + + /** + * Get searchRequestListenersList, + * + * @return List of SearchRequestOperationsListener + */ + public List getListeners() { + return searchRequestListenersList; + } + + /** + * Create the {@link SearchRequestOperationsListener.CompositeListener} + * with the all listeners enabled at cluster-level and request-level. + * + * @param searchRequest The SearchRequest object used to decide which request-level listeners to add based on states/flags + * @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener} + * @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list. + * @return SearchRequestOperationsListener.CompositeListener + */ + public SearchRequestOperationsListener.CompositeListener buildCompositeListener( + final SearchRequest searchRequest, + final Logger logger, + final SearchRequestOperationsListener... perRequestListeners + ) { + final List searchListenersList = Stream.concat( + searchRequestListenersList.stream(), + Arrays.stream(perRequestListeners) + ) + .filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest))) + .collect(Collectors.toList()); + + return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 19ce0beb3c493..2a09cc084f79f 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -20,7 +20,16 @@ * @opensearch.internal */ @InternalApi -abstract class SearchRequestOperationsListener { +public abstract class SearchRequestOperationsListener { + private volatile boolean enabled; + + protected SearchRequestOperationsListener() { + this.enabled = true; + } + + protected SearchRequestOperationsListener(final boolean enabled) { + this.enabled = enabled; + } abstract void onPhaseStart(SearchPhaseContext context); @@ -32,6 +41,18 @@ void onRequestStart(SearchRequestContext searchRequestContext) {} void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + boolean isEnabled(SearchRequest searchRequest) { + return isEnabled(); + } + + boolean isEnabled() { + return enabled; + } + + protected void setEnabled(final boolean enabled) { + this.enabled = enabled; + } + /** * Holder of Composite Listeners * @@ -101,5 +122,9 @@ public void onRequestEnd(SearchPhaseContext context, SearchRequestContext search } } } + + public List getListeners() { + return listeners; + } } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index a55cfd463a78f..7f25f9026f215 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -116,11 +116,11 @@ public SearchRequestSlowLog(ClusterService clusterService) { this.logger = logger; Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name()); - this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos(); - this.infoThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING).nanos(); - this.debugThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING).nanos(); - this.traceThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING).nanos(); - this.level = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL); + this.setWarnThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING)); + this.setInfoThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING)); + this.setDebugThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING)); + this.setTraceThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING)); + this.setLevel(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL)); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING, this::setWarnThreshold); @@ -233,18 +233,22 @@ private static String escapeJson(String text) { void setWarnThreshold(TimeValue warnThreshold) { this.warnThreshold = warnThreshold.nanos(); + setEnabledIfThresholdExceed(); } void setInfoThreshold(TimeValue infoThreshold) { this.infoThreshold = infoThreshold.nanos(); + setEnabledIfThresholdExceed(); } void setDebugThreshold(TimeValue debugThreshold) { this.debugThreshold = debugThreshold.nanos(); + setEnabledIfThresholdExceed(); } void setTraceThreshold(TimeValue traceThreshold) { this.traceThreshold = traceThreshold.nanos(); + setEnabledIfThresholdExceed(); } void setLevel(SlowLogLevel level) { @@ -270,4 +274,8 @@ protected long getTraceThreshold() { SlowLogLevel getLevel() { return level; } + + private void setEnabledIfThresholdExceed() { + super.setEnabled(this.warnThreshold >= 0 || this.debugThreshold >= 0 || this.infoThreshold >= 0 || this.traceThreshold >= 0); + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 262750849eaa9..88d599a0dcdaa 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -12,6 +12,8 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.metrics.MeanMetric; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import java.util.EnumMap; import java.util.Map; @@ -26,8 +28,18 @@ public final class SearchRequestStats extends SearchRequestOperationsListener { Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); + public static final String SEARCH_REQUEST_STATS_ENABLED_KEY = "search.request_stats_enabled"; + public static final Setting SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting( + SEARCH_REQUEST_STATS_ENABLED_KEY, + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + @Inject - public SearchRequestStats() { + public SearchRequestStats(ClusterSettings clusterSettings) { + this.setEnabled(clusterSettings.get(SEARCH_REQUEST_STATS_ENABLED)); + clusterSettings.addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { phaseStatsMap.put(searchPhaseName, new StatsHolder()); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java index 054bd578cc56c..538e7fd54e2c3 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java @@ -110,7 +110,7 @@ final class SearchResponseMerger { /** * Add a search response to the list of responses to be merged together into one. * Merges currently happen at once when all responses are available and - * {@link #getMergedResponse(SearchResponse.Clusters)} )} is called. + * {@link #getMergedResponse(SearchResponse.Clusters, SearchRequestContext)} )} is called. * That may change in the future as it's possible to introduce incremental merges as responses come in if necessary. */ void add(SearchResponse searchResponse) { @@ -126,7 +126,7 @@ int numResponses() { * Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)} * so that all responses are merged into a single one. */ - SearchResponse getMergedResponse(SearchResponse.Clusters clusters) { + SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext) { // if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true, // we end up calling merge without anything to merge, we just return an empty search response if (searchResponses.size() == 0) { @@ -236,7 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) { successfulShards, skippedShards, tookInMillis, - searchTimeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), shardFailures, clusters, null diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 05f4308df74fa..842c10b700d24 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -98,7 +98,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -154,20 +153,12 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting( - SEARCH_REQUEST_STATS_ENABLED_KEY, - false, - Property.Dynamic, - Property.NodeScope - ); - public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled"; public static final Setting SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting( SEARCH_PHASE_TOOK_ENABLED_KEY, false, - Property.Dynamic, - Property.NodeScope + Setting.Property.Dynamic, + Setting.Property.NodeScope ); private final NodeClient client; @@ -181,14 +172,10 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -224,12 +210,9 @@ public TransportSearchAction( this.indexNameExpressionResolver = indexNameExpressionResolver; this.namedWriteableRegistry = namedWriteableRegistry; this.searchPipelineService = searchPipelineService; - this.isRequestStatsEnabled = clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED); - clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setIsRequestStatsEnabled); - this.searchRequestStats = searchRequestStats; - this.searchRequestSlowLog = searchRequestSlowLog; this.metricsRegistry = metricsRegistry; this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING); + this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory; clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); } @@ -241,10 +224,6 @@ private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) { } } - private void setIsRequestStatsEnabled(boolean isRequestStatsEnabled) { - this.isRequestStatsEnabled = isRequestStatsEnabled; - } - private Map buildPerIndexAliasFilter( SearchRequest request, ClusterState clusterState, @@ -289,8 +268,6 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust } /** - * Listener to track request-level tookTime and phase tookTimes from the coordinator. - * * Search operations need two clocks. One clock is to fulfill real clock needs (e.g., resolving * "now" to an index name). Another clock is needed for measuring how long a search operation * took. These two uses are at odds with each other. There are many issues with using a real @@ -300,12 +277,10 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust * * @opensearch.internal */ - static final class SearchTimeProvider extends SearchRequestOperationsListener { - + static final class SearchTimeProvider { private final long absoluteStartMillis; private final long relativeStartNanos; private final LongSupplier relativeCurrentNanosProvider; - private boolean phaseTook = false; /** * Instantiates a new search time provider. The absolute start time is the real clock time @@ -331,43 +306,6 @@ long getAbsoluteStartMillis() { long buildTookInMillis() { return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos); } - - public void setPhaseTook(boolean phaseTook) { - this.phaseTook = phaseTook; - } - - SearchResponse.PhaseTook getPhaseTook() { - if (phaseTook) { - Map phaseTookMap = new HashMap<>(); - // Convert Map to Map for SearchResponse() - for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) { - phaseTookMap.put(searchPhaseName.getName(), phaseStatsMap.get(searchPhaseName)); - } - return new SearchResponse.PhaseTook(phaseTookMap); - } else { - return null; - } - } - - Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); - - @Override - void onPhaseStart(SearchPhaseContext context) {} - - @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - phaseStatsMap.put( - context.getCurrentPhase().getSearchPhaseName(), - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()) - ); - } - - @Override - void onPhaseFailure(SearchPhaseContext context) {} - - public Long getPhaseTookTime(SearchPhaseName searchPhaseName) { - return phaseStatsMap.get(searchPhaseName); - } } @Override @@ -490,11 +428,12 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); - - final List searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider); - SearchRequestContext searchRequestContext = new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) - ); + if (originalSearchRequest.isPhaseTook() == null) { + originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED)); + } + SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory + .buildCompositeListener(originalSearchRequest, logger); + SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest); searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); PipelinedRequest searchRequest; @@ -599,7 +538,8 @@ private ActionListener buildRewriteListener( searchContext, searchAsyncActionProvider, searchRequestContext - ) + ), + searchRequestContext ); } else { AtomicInteger skippedClusters = new AtomicInteger(0); @@ -687,7 +627,8 @@ static void ccsRemoteReduce( RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, - BiConsumer> localSearchConsumer + BiConsumer> localSearchConsumer, + SearchRequestContext searchRequestContext ) { if (localIndices == null && remoteIndices.size() == 1) { @@ -729,7 +670,7 @@ public void onResponse(SearchResponse searchResponse) { searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(), timeProvider.buildTookInMillis(), - timeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), searchResponse.getShardFailures(), new SearchResponse.Clusters(1, 1, 0), searchResponse.pointInTimeId() @@ -775,7 +716,8 @@ public void onFailure(Exception e) { exceptions, searchResponseMerger, totalClusters, - listener + listener, + searchRequestContext ); Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); remoteClusterClient.search(ccsSearchRequest, ccsListener); @@ -789,7 +731,8 @@ public void onFailure(Exception e) { exceptions, searchResponseMerger, totalClusters, - listener + listener, + searchRequestContext ); SearchRequest ccsLocalSearchRequest = SearchRequest.subSearchRequest( searchRequest, @@ -884,7 +827,8 @@ private static ActionListener createCCSListener( AtomicReference exceptions, SearchResponseMerger searchResponseMerger, int totalClusters, - ActionListener originalListener + ActionListener originalListener, + SearchRequestContext searchRequestContext ) { return new CCSActionListener( clusterAlias, @@ -906,7 +850,7 @@ SearchResponse createFinalResponse() { searchResponseMerger.numResponses(), skippedClusters.get() ); - return searchResponseMerger.getMergedResponse(clusters); + return searchResponseMerger.getMergedResponse(clusters, searchRequestContext); } }; } @@ -1244,35 +1188,6 @@ AbstractSearchAsyncAction asyncSearchAction( ); } - private List createSearchListenerList(SearchRequest searchRequest, SearchTimeProvider timeProvider) { - final List searchListenersList = new ArrayList<>(); - - if (isRequestStatsEnabled) { - searchListenersList.add(searchRequestStats); - } - - // phase_took is enabled with request param and/or cluster setting - Boolean phaseTookRequestParam = searchRequest.isPhaseTook(); - if (phaseTookRequestParam == null) { // check cluster setting only when request param is undefined - if (clusterService.getClusterSettings().get(TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED)) { - timeProvider.setPhaseTook(true); - searchListenersList.add(timeProvider); - } - } else if (phaseTookRequestParam == true) { - timeProvider.setPhaseTook(true); - searchListenersList.add(timeProvider); - } - - if (searchRequestSlowLog.getWarnThreshold() >= 0 - || searchRequestSlowLog.getInfoThreshold() >= 0 - || searchRequestSlowLog.getDebugThreshold() >= 0 - || searchRequestSlowLog.getTraceThreshold() >= 0) { - searchListenersList.add(searchRequestSlowLog); - } - - return searchListenersList; - } - private AbstractSearchAsyncAction searchAsyncAction( SearchTask task, SearchRequest searchRequest, diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fa4b0f475edc5..277286ae1ff1b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -36,6 +36,7 @@ import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.SearchRequestSlowLog; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -380,9 +381,9 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, - TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED, - TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED, TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING, + TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED, + SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4cbf8dc191a9d..8510122c39fcb 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,6 +46,8 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchRequestOperationsCompositeListenerFactory; +import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; @@ -783,7 +785,7 @@ protected Node( threadPool ); - final SearchRequestStats searchRequestStats = new SearchRequestStats(); + final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings()); final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); @@ -879,6 +881,17 @@ protected Node( ) .collect(Collectors.toList()); + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory + final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = + new SearchRequestOperationsCompositeListenerFactory( + Stream.concat( + Stream.of(searchRequestStats, searchRequestSlowLog), + pluginComponents.stream() + .filter(p -> p instanceof SearchRequestOperationsListener) + .map(p -> (SearchRequestOperationsListener) p) + ).toArray(SearchRequestOperationsListener[]::new) + ); + ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -1275,6 +1288,7 @@ protected Node( b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); + b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); }); injector = modules.createInjector(); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index b8ab5c935fa34..a5ca08f141560 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -44,6 +44,8 @@ import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.metrics.OperationStats; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.AllCircuitBreakerStats; @@ -961,7 +963,12 @@ public void apply(String action, AdmissionControlActionType admissionControlActi private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { NodeIndicesStats indicesStats = null; if (remoteStoreStats) { - indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats()); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + indicesStats = new NodeIndicesStats( + new CommonStats(CommonStatsFlags.ALL), + new HashMap<>(), + new SearchRequestStats(clusterSettings) + ); RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); remoteSegmentStats.addUploadBytesStarted(10L); remoteSegmentStats.addUploadBytesSucceeded(10L); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index e17fbab32a12e..76129341fc9a2 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -32,12 +32,15 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; @@ -175,7 +178,7 @@ private AbstractSearchAsyncAction createAction( results, request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -328,7 +331,8 @@ public void testSendSearchResponseDisallowPartialFailures() { } public void testOnPhaseFailureAndVerifyListeners() { - SearchRequestStats testListener = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testListener = new SearchRequestStats(clusterSettings); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); @@ -591,7 +595,8 @@ public void onFailure(Exception e) { } public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException { - SearchRequestStats testListener = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testListener = new SearchRequestStats(clusterSettings); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); long delay = (randomIntBetween(1, 5)); @@ -640,7 +645,8 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx } public void testOnPhaseListenersWithDfsType() throws InterruptedException { - SearchRequestStats testListener = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testListener = new SearchRequestStats(clusterSettings); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( @@ -710,7 +716,10 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), + searchRequest + ) ); } @@ -760,7 +769,10 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), + searchRequest + ) ) { @Override ShardSearchFailure[] buildShardFailures() { diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 4ed4797efe604..56dcf66d5607d 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -31,6 +31,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.util.BytesRef; import org.opensearch.Version; import org.opensearch.action.OriginalIndices; @@ -137,7 +138,10 @@ public void run() throws IOException { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -229,7 +233,10 @@ public void run() throws IOException { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -320,7 +327,10 @@ public void sendCanMatch( new ArraySearchPhaseResults<>(iter.size()), randomIntBetween(1, 32), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ) { @Override @@ -348,7 +358,10 @@ protected void executePhaseOnShard( } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -433,7 +446,10 @@ public void run() { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -533,7 +549,10 @@ public void run() { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index 7b4fa1d8387df..af7adc4e58fb8 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -31,6 +31,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.cluster.ClusterState; @@ -61,6 +62,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -136,7 +138,7 @@ public void testSkipSearchShards() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override @@ -255,7 +257,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override @@ -373,7 +375,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { TestSearchResponse response = new TestSearchResponse(); @@ -496,7 +498,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { TestSearchResponse response = new TestSearchResponse(); @@ -610,7 +612,7 @@ public void testAllowPartialResults() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override protected void executePhaseOnShard( diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index a8c0c43ac5080..faf6f86c69c27 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopFieldDocs; @@ -63,6 +64,7 @@ import org.opensearch.transport.Transport; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -215,7 +217,10 @@ public void sendExecuteQuery( null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java new file mode 100644 index 0000000000000..78c5ba4412c68 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -0,0 +1,131 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.List; + +public class SearchRequestOperationsCompositeListenerFactoryTests extends OpenSearchTestCase { + public void testAddAndGetListeners() { + SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener(); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener + ); + assertEquals(1, requestListeners.getListeners().size()); + assertEquals(testListener, requestListeners.getListeners().get(0)); + } + + public void testStandardListenersEnabled() { + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + testListener1.setEnabled(false); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1, + testListener2 + ); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, + logger + ); + List listeners = compositeListener.getListeners(); + assertEquals(1, listeners.size()); + assertEquals(testListener2, listeners.get(0)); + assertEquals(2, requestListeners.getListeners().size()); + assertEquals(testListener1, requestListeners.getListeners().get(0)); + assertEquals(testListener2, requestListeners.getListeners().get(1)); + } + + public void testStandardListenersAndPerRequestListener() { + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1 + ); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + testListener1.setEnabled(true); + testListener2.setEnabled(true); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + searchRequest.setPhaseTook(true); + SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, + logger, + testListener2 + ); + List listeners = compositeListener.getListeners(); + assertEquals(2, listeners.size()); + assertEquals(testListener1, listeners.get(0)); + assertEquals(testListener2, listeners.get(1)); + assertEquals(1, requestListeners.getListeners().size()); + assertEquals(testListener1, requestListeners.getListeners().get(0)); + } + + public void testStandardListenersDisabledAndPerRequestListener() { + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + testListener1.setEnabled(false); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1 + ); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, + logger, + testListener2 + ); + List listeners = compositeListener.getListeners(); + assertEquals(1, listeners.size()); + assertEquals(testListener2, listeners.get(0)); + assertEquals(1, requestListeners.getListeners().size()); + assertEquals(testListener1, requestListeners.getListeners().get(0)); + assertFalse(requestListeners.getListeners().get(0).isEnabled()); + } + + public void testStandardListenerAndPerRequestListenerDisabled() { + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1 + ); + testListener1.setEnabled(true); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + testListener2.setEnabled(false); + + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + searchRequest.setPhaseTook(false); + SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, + logger, + testListener2 + ); + List listeners = compositeListener.getListeners(); + assertEquals(1, listeners.size()); + assertEquals(testListener1, listeners.get(0)); + assertEquals(1, requestListeners.getListeners().size()); + assertEquals(testListener1, requestListeners.getListeners().get(0)); + } + + public SearchRequestOperationsListener createTestSearchRequestOperationsListener() { + return new SearchRequestOperationsListener() { + @Override + void onPhaseStart(SearchPhaseContext context) {} + + @Override + void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + void onPhaseFailure(SearchPhaseContext context) {} + }; + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java index 58a4c4a4e555d..0f737e00478cb 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java @@ -8,6 +8,10 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; + +import java.util.List; + /** * Helper interface to access package protected {@link SearchRequestOperationsListener} from test cases. */ @@ -17,6 +21,12 @@ default void onPhaseStart(SearchRequestOperationsListener listener, SearchPhaseC } default void onPhaseEnd(SearchRequestOperationsListener listener, SearchPhaseContext context) { - listener.onPhaseEnd(context, new SearchRequestContext()); + listener.onPhaseEnd( + context, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java index e23f08c9415eb..f009988ffae17 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -104,6 +104,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); + SearchRequestOperationsCompositeListenerFactory searchRequestListeners = new SearchRequestOperationsCompositeListenerFactory(); SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2); int numberOfLoggersAfter = context.getLoggers().size(); @@ -175,7 +176,8 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { ArrayList searchRequestContexts = new ArrayList<>(); for (int i = 0; i < numRequests; i++) { SearchRequestContext searchRequestContext = new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger), + searchRequest ); searchRequestContext.setAbsoluteStartNanos((i < numRequestsLogged) ? 0 : System.nanoTime()); searchRequestContexts.add(searchRequestContext); @@ -204,7 +206,10 @@ public void testSearchRequestSlowLogHasJsonFields_EmptySearchRequestContext() th SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( searchPhaseContext, 10, @@ -225,7 +230,10 @@ public void testSearchRequestSlowLogHasJsonFields_NotEmptySearchRequestContext() SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); @@ -251,7 +259,10 @@ public void testSearchRequestSlowLogHasJsonFields_PartialContext() throws IOExce SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); @@ -277,7 +288,10 @@ public void testSearchRequestSlowLogSearchContextPrinterToLog() throws IOExcepti SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 93cf77933fdd5..377ccebbfd418 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -8,9 +8,13 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -22,7 +26,8 @@ public class SearchRequestStatsTests extends OpenSearchTestCase { public void testSearchRequestPhaseFailure() { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings); SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); @@ -37,7 +42,8 @@ public void testSearchRequestPhaseFailure() { } public void testSearchRequestStats() { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings); SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); @@ -50,7 +56,13 @@ public void testSearchRequestStats() { long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); - testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); + testRequestStats.onPhaseEnd( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); assertEquals(1, testRequestStats.getPhaseTotal(searchPhaseName)); assertThat(testRequestStats.getPhaseMetric(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); @@ -58,7 +70,8 @@ public void testSearchRequestStats() { } public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedException { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); @@ -85,7 +98,8 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE } public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedException { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); @@ -102,7 +116,13 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); + testRequestStats.onPhaseEnd( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); countDownLatch.countDown(); }); threads[i].start(); @@ -121,7 +141,8 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc } public void testSearchRequestStatsOnPhaseFailureConcurrently() throws InterruptedException { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java index 1004965c0d50e..ce4d5ca4f7091 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TotalHits; import org.opensearch.OpenSearchException; @@ -132,7 +133,13 @@ public void testMergeTookInMillis() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); - SearchResponse searchResponse = merger.getMergedResponse(SearchResponse.Clusters.EMPTY); + SearchResponse searchResponse = merger.getMergedResponse( + SearchResponse.Clusters.EMPTY, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); } @@ -184,7 +191,13 @@ public void testMergeShardFailures() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -235,7 +248,13 @@ public void testMergeShardFailuresNullShardTarget() throws InterruptedException awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -281,7 +300,13 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { } awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); - ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY).getShardFailures(); + ShardSearchFailure[] shardFailures = merger.getMergedResponse( + SearchResponse.Clusters.EMPTY, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ).getShardFailures(); assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); } @@ -315,7 +340,13 @@ public void testMergeProfileResults() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -377,7 +408,13 @@ public void testMergeCompletionSuggestions() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -449,7 +486,13 @@ public void testMergeCompletionSuggestionsTieBreak() throws InterruptedException awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -523,7 +566,13 @@ public void testMergeAggs() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -680,7 +729,13 @@ public void testMergeSearchHits() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse searchResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); assertEquals(expectedTotal, searchResponse.getTotalShards()); @@ -740,7 +795,13 @@ public void testMergeNoResponsesAdded() { SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, emptyReduceContextBuilder()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); assertEquals(0, merger.numResponses()); - SearchResponse response = merger.getMergedResponse(clusters); + SearchResponse response = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, response.getClusters()); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), response.getTook().millis()); assertEquals(0, response.getTotalShards()); @@ -813,7 +874,13 @@ public void testMergeEmptySearchHitsWithNonEmpty() { merger.add(searchResponse); } assertEquals(2, merger.numResponses()); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(10, mergedResponse.getHits().getTotalHits().value); assertEquals(10, mergedResponse.getHits().getHits().length); assertEquals(2, mergedResponse.getTotalShards()); @@ -855,7 +922,13 @@ public void testMergeOnlyEmptyHits() { ); merger.add(searchResponse); } - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(expectedTotalHits, mergedResponse.getHits().getTotalHits()); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java deleted file mode 100644 index 4d8a44417a3ee..0000000000000 --- a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.test.OpenSearchTestCase; - -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class SearchTimeProviderTests extends OpenSearchTestCase { - - public void testSearchTimeProviderPhaseFailure() { - TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); - SearchPhaseContext ctx = mock(SearchPhaseContext.class); - SearchPhase mockSearchPhase = mock(SearchPhase.class); - when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - - for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - testTimeProvider.onPhaseStart(ctx); - assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - testTimeProvider.onPhaseFailure(ctx); - assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - } - } - - public void testSearchTimeProviderPhaseEnd() { - TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); - - SearchPhaseContext ctx = mock(SearchPhaseContext.class); - SearchPhase mockSearchPhase = mock(SearchPhase.class); - when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - - for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - long tookTimeInMillis = randomIntBetween(1, 100); - testTimeProvider.onPhaseStart(ctx); - long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); - when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); - assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - testTimeProvider.onPhaseEnd(ctx, new SearchRequestContext()); - assertThat(testTimeProvider.getPhaseTookTime(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); - } - } -} diff --git a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java index c4bf8a5d87172..da19c839f3826 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.TotalHits; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; @@ -483,7 +484,11 @@ public void testCCSRemoteReduceMergeFails() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -541,7 +546,11 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -578,7 +587,11 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -636,7 +649,11 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -676,7 +693,11 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -727,7 +748,11 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 52b272094cd86..5656b77445772 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -37,6 +37,8 @@ import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchRequestOperationsListenerSupport; import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; @@ -77,7 +79,8 @@ public void testShardLevelSearchGroupStats() throws Exception { long paramValue = randomIntBetween(2, 50); // Testing for request stats - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings); SearchPhaseContext ctx = mock(SearchPhaseContext.class); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { SearchPhase mockSearchPhase = mock(SearchPhase.class); diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 6f36d22b7e17b..2424e38636466 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -34,6 +34,8 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.test.OpenSearchTestCase; @@ -46,7 +48,8 @@ public class NodeIndicesStatsTests extends OpenSearchTestCase { public void testInvalidLevel() { CommonStats oldStats = new CommonStats(); - SearchRequestStats requestStats = new SearchRequestStats(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats requestStats = new SearchRequestStats(clusterSettings); final NodeIndicesStats stats = new NodeIndicesStats(oldStats, Collections.emptyMap(), requestStats); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9fe1f8294fc74..9bb1f51c51cf6 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -90,7 +90,7 @@ import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchRequestSlowLog; +import org.opensearch.action.search.SearchRequestOperationsCompositeListenerFactory; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.search.TransportSearchAction; @@ -2285,6 +2285,8 @@ public void onFailure(final Exception e) { writableRegistry(), searchService::aggReduceContextBuilder ); + SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = + new SearchRequestOperationsCompositeListenerFactory(); actions.put( SearchAction.INSTANCE, new TransportSearchAction( @@ -2310,9 +2312,8 @@ public void onFailure(final Exception e) { List.of(), client ), - null, - new SearchRequestSlowLog(clusterService), - NoopMetricsRegistry.INSTANCE + NoopMetricsRegistry.INSTANCE, + searchRequestOperationsCompositeListenerFactory ) ); actions.put(