From bb64cd2c4503bf50dc4461e9cab96a9d836f788c Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 4 Jan 2024 18:05:05 -0800 Subject: [PATCH] refactor to remove the TimeProvider listener Signed-off-by: Chenyang Ji --- .../search/AbstractSearchAsyncAction.java | 4 +- .../action/search/SearchRequestContext.java | 19 ++++- .../SearchRequestOperationsListener.java | 11 ++- .../SearchRequestOperationsListeners.java | 6 +- .../action/search/SearchResponseMerger.java | 6 +- .../action/search/TransportSearchAction.java | 83 +++++-------------- ...SearchRequestOperationsListenersTests.java | 57 ++++++------- .../search/SearchResponseMergerTests.java | 25 +++--- .../search/SearchTimeProviderTests.java | 54 ------------ .../search/TransportSearchActionTests.java | 18 ++-- 10 files changed, 106 insertions(+), 177 deletions(-) delete mode 100644 server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index f18bbb8a1cc13..5b41c2a13b596 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -214,7 +214,7 @@ public final void start() { 0, 0, buildTookInMillis(), - timeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), ShardSearchFailure.EMPTY_ARRAY, clusters, null @@ -670,7 +670,7 @@ protected final SearchResponse buildSearchResponse( successfulOps.get(), skippedOps.get(), buildTookInMillis(), - timeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), failures, clusters, searchContextId diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index 674363600b251..7f2e71afec5ce 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -31,18 +31,25 @@ class SearchRequestContext { private TotalHits totalHits; private final EnumMap shardStats; + private final boolean phaseTookEnabled; + /** * This constructor is for testing only */ SearchRequestContext() { - this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger())); + this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), false); } - SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { + SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, boolean phaseTookEnabled) { this.searchRequestOperationsListener = searchRequestOperationsListener; this.absoluteStartNanos = System.nanoTime(); this.phaseTookMap = new HashMap<>(); this.shardStats = new EnumMap<>(ShardStatsFieldNames.class); + this.phaseTookEnabled = phaseTookEnabled; + } + + SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { + this(searchRequestOperationsListener, false); } SearchRequestOperationsListener getSearchRequestOperationsListener() { @@ -57,6 +64,14 @@ Map phaseTookMap() { return phaseTookMap; } + SearchResponse.PhaseTook getPhaseTook() { + if (phaseTookEnabled) { + return new SearchResponse.PhaseTook(phaseTookMap); + } else { + return null; + } + } + /** * Override absoluteStartNanos set in constructor. * For testing only diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index ecebabcf2b8ae..99619d9f9c900 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -24,7 +24,7 @@ public abstract class SearchRequestOperationsListener { private volatile boolean enabled; protected SearchRequestOperationsListener() { - this.enabled = false; + this.enabled = true; } protected SearchRequestOperationsListener(boolean enabled) { @@ -41,11 +41,15 @@ void onRequestStart(SearchRequestContext searchRequestContext) {} void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - boolean getEnabled() { + boolean isEnabled(SearchRequest searchRequest) { + return isEnabled(); + } + + boolean isEnabled() { return enabled; } - void setEnabled(boolean enabled) { + protected void setEnabled(boolean enabled) { this.enabled = enabled; } @@ -62,7 +66,6 @@ static final class CompositeListener extends SearchRequestOperationsListener { CompositeListener(List listeners, Logger logger) { this.listeners = listeners; this.logger = logger; - this.setEnabled(true); } @Override diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListeners.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListeners.java index f20c52b9d97d4..237d25169bdd2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListeners.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListeners.java @@ -59,18 +59,22 @@ public List getListeners() { * Create the {@link SearchRequestOperationsListener.CompositeListener} * with the all listeners enabled at cluster-level and request-level. * + * @param searchRequest The SearchRequest object used to decide which request-level listeners to add based on states/flags * @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener} * @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list. * @return SearchRequestOperationsListener.CompositeListener */ public SearchRequestOperationsListener.CompositeListener buildCompositeListener( + SearchRequest searchRequest, Logger logger, SearchRequestOperationsListener... perRequestListeners ) { final List searchListenersList = Stream.concat( searchRequestListenersList.stream(), Arrays.stream(perRequestListeners) - ).filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList()); + ) + .filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest))) + .collect(Collectors.toList()); return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java index 054bd578cc56c..538e7fd54e2c3 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java @@ -110,7 +110,7 @@ final class SearchResponseMerger { /** * Add a search response to the list of responses to be merged together into one. * Merges currently happen at once when all responses are available and - * {@link #getMergedResponse(SearchResponse.Clusters)} )} is called. + * {@link #getMergedResponse(SearchResponse.Clusters, SearchRequestContext)} )} is called. * That may change in the future as it's possible to introduce incremental merges as responses come in if necessary. */ void add(SearchResponse searchResponse) { @@ -126,7 +126,7 @@ int numResponses() { * Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)} * so that all responses are merged into a single one. */ - SearchResponse getMergedResponse(SearchResponse.Clusters clusters) { + SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext) { // if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true, // we end up calling merge without anything to merge, we just return an empty search response if (searchResponses.size() == 0) { @@ -236,7 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) { successfulShards, skippedShards, tookInMillis, - searchTimeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), shardFailures, clusters, null diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index ca7853cd78031..0738ce0c5ca68 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -98,7 +98,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -269,8 +268,6 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust } /** - * Listener to track request-level tookTime and phase tookTimes from the coordinator. - * * Search operations need two clocks. One clock is to fulfill real clock needs (e.g., resolving * "now" to an index name). Another clock is needed for measuring how long a search operation * took. These two uses are at odds with each other. There are many issues with using a real @@ -280,8 +277,7 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust * * @opensearch.internal */ - static final class SearchTimeProvider extends SearchRequestOperationsListener { - + static final class SearchTimeProvider { private final long absoluteStartMillis; private final long relativeStartNanos; private final LongSupplier relativeCurrentNanosProvider; @@ -310,53 +306,6 @@ long getAbsoluteStartMillis() { long buildTookInMillis() { return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos); } - - SearchResponse.PhaseTook getPhaseTook() { - if (getEnabled()) { - Map phaseTookMap = new HashMap<>(); - // Convert Map to Map for SearchResponse() - for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) { - phaseTookMap.put(searchPhaseName.getName(), phaseStatsMap.get(searchPhaseName)); - } - return new SearchResponse.PhaseTook(phaseTookMap); - } else { - return null; - } - } - - Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); - - /** - * Set if this listener is enabled based on the cluster level setting - * and per request enable flags. - * - * @param enabledAtClusterLevel if the SearchTimeProvider listener is enabled at cluster level - * @param searchRequest the original Search Request - * @opensearch.internal - */ - - void setEnabled(boolean enabledAtClusterLevel, SearchRequest searchRequest) { - // phase_took is enabled wi th request param and/or cluster setting - super.setEnabled(enabledAtClusterLevel || (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook())); - } - - @Override - void onPhaseStart(SearchPhaseContext context) {} - - @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - phaseStatsMap.put( - context.getCurrentPhase().getSearchPhaseName(), - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()) - ); - } - - @Override - void onPhaseFailure(SearchPhaseContext context) {} - - public Long getPhaseTookTime(SearchPhaseName searchPhaseName) { - return phaseStatsMap.get(searchPhaseName); - } } @Override @@ -479,10 +428,15 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); - timeProvider.setEnabled(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED), originalSearchRequest); + final boolean phaseTookEnabled; + if (originalSearchRequest.isPhaseTook() == null) { + phaseTookEnabled = clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED); + } else { + phaseTookEnabled = originalSearchRequest.isPhaseTook(); + } SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsListeners - .buildCompositeListener(logger, timeProvider); - SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners); + .buildCompositeListener(originalSearchRequest, logger); + SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, phaseTookEnabled); searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); PipelinedRequest searchRequest; @@ -587,7 +541,8 @@ private ActionListener buildRewriteListener( searchContext, searchAsyncActionProvider, searchRequestContext - ) + ), + searchRequestContext ); } else { AtomicInteger skippedClusters = new AtomicInteger(0); @@ -675,7 +630,8 @@ static void ccsRemoteReduce( RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, - BiConsumer> localSearchConsumer + BiConsumer> localSearchConsumer, + SearchRequestContext searchRequestContext ) { if (localIndices == null && remoteIndices.size() == 1) { @@ -717,7 +673,7 @@ public void onResponse(SearchResponse searchResponse) { searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(), timeProvider.buildTookInMillis(), - timeProvider.getPhaseTook(), + searchRequestContext.getPhaseTook(), searchResponse.getShardFailures(), new SearchResponse.Clusters(1, 1, 0), searchResponse.pointInTimeId() @@ -763,7 +719,8 @@ public void onFailure(Exception e) { exceptions, searchResponseMerger, totalClusters, - listener + listener, + searchRequestContext ); Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); remoteClusterClient.search(ccsSearchRequest, ccsListener); @@ -777,7 +734,8 @@ public void onFailure(Exception e) { exceptions, searchResponseMerger, totalClusters, - listener + listener, + searchRequestContext ); SearchRequest ccsLocalSearchRequest = SearchRequest.subSearchRequest( searchRequest, @@ -872,7 +830,8 @@ private static ActionListener createCCSListener( AtomicReference exceptions, SearchResponseMerger searchResponseMerger, int totalClusters, - ActionListener originalListener + ActionListener originalListener, + SearchRequestContext searchRequestContext ) { return new CCSActionListener( clusterAlias, @@ -894,7 +853,7 @@ SearchResponse createFinalResponse() { searchResponseMerger.numResponses(), skippedClusters.get() ); - return searchResponseMerger.getMergedResponse(clusters); + return searchResponseMerger.getMergedResponse(clusters, searchRequestContext); } }; } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenersTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenersTests.java index d935431f935f9..e762e795d88d2 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenersTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenersTests.java @@ -25,9 +25,14 @@ public void testAddAndGetListeners() { public void testStandardListenersEnabled() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + testListener1.setEnabled(false); SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1, testListener2); - testListener2.setEnabled(true); - SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(logger); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, + logger + ); List listeners = compositeListener.getListeners(); assertEquals(1, listeners.size()); assertEquals(testListener2, listeners.get(0)); @@ -36,72 +41,62 @@ public void testStandardListenersEnabled() { assertEquals(testListener2, requestListeners.getListeners().get(1)); } - public void testStandardListenersAndTimeProvider() { + public void testStandardListenersAndPerRequestListener() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1); - + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); testListener1.setEnabled(true); - TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider( - 0, - System.nanoTime(), - System::nanoTime - ); + testListener2.setEnabled(true); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); searchRequest.setPhaseTook(true); - timeProviderListener.setEnabled(false, searchRequest); SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, logger, - timeProviderListener + testListener2 ); List listeners = compositeListener.getListeners(); assertEquals(2, listeners.size()); assertEquals(testListener1, listeners.get(0)); - assertEquals(timeProviderListener, listeners.get(1)); + assertEquals(testListener2, listeners.get(1)); assertEquals(1, requestListeners.getListeners().size()); assertEquals(testListener1, requestListeners.getListeners().get(0)); } - public void testStandardListenersDisabledAndTimeProvider() { + public void testStandardListenersDisabledAndPerRequestListener() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + testListener1.setEnabled(false); SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1); - TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider( - 0, - System.nanoTime(), - System::nanoTime - ); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); - searchRequest.setPhaseTook(true); - timeProviderListener.setEnabled(false, searchRequest); SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, logger, - timeProviderListener + testListener2 ); List listeners = compositeListener.getListeners(); assertEquals(1, listeners.size()); - assertEquals(timeProviderListener, listeners.get(0)); + assertEquals(testListener2, listeners.get(0)); assertEquals(1, requestListeners.getListeners().size()); assertEquals(testListener1, requestListeners.getListeners().get(0)); - assertFalse(requestListeners.getListeners().get(0).getEnabled()); + assertFalse(requestListeners.getListeners().get(0).isEnabled()); } - public void testStandardListenerAndTimeProviderDisabled() { + public void testStandardListenerAndPerRequestListenerDisabled() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1); - testListener1.setEnabled(true); - SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( - 0, - System.nanoTime(), - System::nanoTime - ); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + testListener2.setEnabled(false); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); searchRequest.setPhaseTook(false); SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( + searchRequest, logger, - timeProviderListener + testListener2 ); List listeners = compositeListener.getListeners(); assertEquals(1, listeners.size()); diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java index 1004965c0d50e..71e4236de8510 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java @@ -132,7 +132,7 @@ public void testMergeTookInMillis() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); - SearchResponse searchResponse = merger.getMergedResponse(SearchResponse.Clusters.EMPTY); + SearchResponse searchResponse = merger.getMergedResponse(SearchResponse.Clusters.EMPTY, new SearchRequestContext()); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); } @@ -184,7 +184,7 @@ public void testMergeShardFailures() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -235,7 +235,7 @@ public void testMergeShardFailuresNullShardTarget() throws InterruptedException awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -281,7 +281,8 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { } awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); - ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY).getShardFailures(); + ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY, new SearchRequestContext()) + .getShardFailures(); assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); } @@ -315,7 +316,7 @@ public void testMergeProfileResults() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -377,7 +378,7 @@ public void testMergeCompletionSuggestions() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -449,7 +450,7 @@ public void testMergeCompletionSuggestionsTieBreak() throws InterruptedException awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -523,7 +524,7 @@ public void testMergeAggs() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -680,7 +681,7 @@ public void testMergeSearchHits() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters); + SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); assertEquals(expectedTotal, searchResponse.getTotalShards()); @@ -740,7 +741,7 @@ public void testMergeNoResponsesAdded() { SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, emptyReduceContextBuilder()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); assertEquals(0, merger.numResponses()); - SearchResponse response = merger.getMergedResponse(clusters); + SearchResponse response = merger.getMergedResponse(clusters, new SearchRequestContext()); assertSame(clusters, response.getClusters()); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), response.getTook().millis()); assertEquals(0, response.getTotalShards()); @@ -813,7 +814,7 @@ public void testMergeEmptySearchHitsWithNonEmpty() { merger.add(searchResponse); } assertEquals(2, merger.numResponses()); - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); assertEquals(10, mergedResponse.getHits().getTotalHits().value); assertEquals(10, mergedResponse.getHits().getHits().length); assertEquals(2, mergedResponse.getTotalShards()); @@ -855,7 +856,7 @@ public void testMergeOnlyEmptyHits() { ); merger.add(searchResponse); } - SearchResponse mergedResponse = merger.getMergedResponse(clusters); + SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); assertEquals(expectedTotalHits, mergedResponse.getHits().getTotalHits()); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java deleted file mode 100644 index 4d8a44417a3ee..0000000000000 --- a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.test.OpenSearchTestCase; - -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class SearchTimeProviderTests extends OpenSearchTestCase { - - public void testSearchTimeProviderPhaseFailure() { - TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); - SearchPhaseContext ctx = mock(SearchPhaseContext.class); - SearchPhase mockSearchPhase = mock(SearchPhase.class); - when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - - for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - testTimeProvider.onPhaseStart(ctx); - assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - testTimeProvider.onPhaseFailure(ctx); - assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - } - } - - public void testSearchTimeProviderPhaseEnd() { - TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); - - SearchPhaseContext ctx = mock(SearchPhaseContext.class); - SearchPhase mockSearchPhase = mock(SearchPhase.class); - when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - - for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - long tookTimeInMillis = randomIntBetween(1, 100); - testTimeProvider.onPhaseStart(ctx); - long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); - when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); - assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - testTimeProvider.onPhaseEnd(ctx, new SearchRequestContext()); - assertThat(testTimeProvider.getPhaseTookTime(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); - } - } -} diff --git a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java index c4bf8a5d87172..bed9bd37c49b4 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java @@ -483,7 +483,8 @@ public void testCCSRemoteReduceMergeFails() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext() ); if (localIndices == null) { assertNull(setOnce.get()); @@ -541,7 +542,8 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext() ); if (localIndices == null) { assertNull(setOnce.get()); @@ -578,7 +580,8 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext() ); if (localIndices == null) { assertNull(setOnce.get()); @@ -636,7 +639,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext() ); if (localIndices == null) { assertNull(setOnce.get()); @@ -676,7 +680,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext() ); if (localIndices == null) { assertNull(setOnce.get()); @@ -727,7 +732,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + new SearchRequestContext() ); if (localIndices == null) { assertNull(setOnce.get());