From 978d14e867c41b224303b2e77745495336568f79 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Thu, 8 Aug 2024 15:10:03 -0700 Subject: [PATCH] Add slice level operation listener methods (#15153) Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../index/shard/SearchOperationListener.java | 60 ++++++++++ .../search/internal/ContextIndexSearcher.java | 31 ++++-- .../shard/SearchOperationListenerTests.java | 105 ++++++++++++++++++ .../search/SearchCancellationTests.java | 4 + .../internal/ContextIndexSearcherTests.java | 4 + .../profile/query/QueryProfilerTests.java | 4 + .../search/query/QueryPhaseTests.java | 7 ++ .../search/query/QueryProfilePhaseTests.java | 7 ++ .../aggregations/AggregatorTestCase.java | 4 + 10 files changed, 215 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f44949bf38511..1964e456acda0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711)) - Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) +- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index 94079db468f9c..5e63262df0d70 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -71,6 +71,33 @@ default void onFailedQueryPhase(SearchContext searchContext) {} */ default void onQueryPhase(SearchContext searchContext, long tookInNanos) {} + /** + * Executed before the slice execution in + * {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)}. + * This will be called once per slice in concurrent search and only once in non-concurrent search. + * @param searchContext the current search context + */ + default void onPreSliceExecution(SearchContext searchContext) {} + + /** + * Executed if the slice execution in + * {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} failed. + * This will be called once per slice in concurrent search and only once in non-concurrent search. + * @param searchContext the current search context + */ + default void onFailedSliceExecution(SearchContext searchContext) {} + + /** + * Executed after the slice execution in + * {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} successfully finished. + * This will be called once per slice in concurrent search and only once in non-concurrent search. + * Note: this is not invoked if the slice execution failed.* + * @param searchContext the current search context + * + * @see #onFailedSliceExecution(org.opensearch.search.internal.SearchContext) + */ + default void onSliceExecution(SearchContext searchContext) {} + /** * Executed before the fetch phase is executed * @param searchContext the current search context @@ -195,6 +222,39 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) { } } + @Override + public void onPreSliceExecution(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onPreSliceExecution(searchContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPreSliceExecution listener [{}] failed", listener), e); + } + } + } + + @Override + public void onFailedSliceExecution(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onFailedSliceExecution(searchContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFailedSliceExecution listener [{}] failed", listener), e); + } + } + } + + @Override + public void onSliceExecution(SearchContext searchContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onSliceExecution(searchContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onSliceExecution listener [{}] failed", listener), e); + } + } + } + @Override public void onPreFetchPhase(SearchContext searchContext) { for (SearchOperationListener listener : listeners) { diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index ec3ed2332d0b8..fa00ace378df1 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -270,20 +270,27 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { - // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. - // This is actually beneficial for search queries to start search on latest segments first for time series workload. - // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf - // reader order here. - if (searchContext.shouldUseTimeSeriesDescSortOptimization()) { - for (int i = leaves.size() - 1; i >= 0; i--) { - searchLeaf(leaves.get(i), weight, collector); - } - } else { - for (int i = 0; i < leaves.size(); i++) { - searchLeaf(leaves.get(i), weight, collector); + searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext); + try { + // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. + // This is actually beneficial for search queries to start search on latest segments first for time series workload. + // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf + // reader order here. + if (searchContext.shouldUseTimeSeriesDescSortOptimization()) { + for (int i = leaves.size() - 1; i >= 0; i--) { + searchLeaf(leaves.get(i), weight, collector); + } + } else { + for (int i = 0; i < leaves.size(); i++) { + searchLeaf(leaves.get(i), weight, collector); + } } + searchContext.bucketCollectorProcessor().processPostCollection(collector); + } catch (Throwable t) { + searchContext.indexShard().getSearchOperationListener().onFailedSliceExecution(searchContext); + throw t; } - searchContext.bucketCollectorProcessor().processPostCollection(collector); + searchContext.indexShard().getSearchOperationListener().onSliceExecution(searchContext); } /** diff --git a/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java index c61c13eecf2c3..b00307920e875 100644 --- a/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java @@ -56,6 +56,9 @@ public void testListenersAreExecuted() { AtomicInteger preQuery = new AtomicInteger(); AtomicInteger failedQuery = new AtomicInteger(); AtomicInteger onQuery = new AtomicInteger(); + AtomicInteger preSlice = new AtomicInteger(); + AtomicInteger failedSlice = new AtomicInteger(); + AtomicInteger onSlice = new AtomicInteger(); AtomicInteger onFetch = new AtomicInteger(); AtomicInteger preFetch = new AtomicInteger(); AtomicInteger failedFetch = new AtomicInteger(); @@ -86,6 +89,24 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) { onQuery.incrementAndGet(); } + @Override + public void onPreSliceExecution(SearchContext searchContext) { + assertNotNull(searchContext); + preSlice.incrementAndGet(); + } + + @Override + public void onFailedSliceExecution(SearchContext searchContext) { + assertNotNull(searchContext); + failedSlice.incrementAndGet(); + } + + @Override + public void onSliceExecution(SearchContext searchContext) { + assertNotNull(searchContext); + onSlice.incrementAndGet(); + } + @Override public void onPreFetchPhase(SearchContext searchContext) { assertNotNull(searchContext); @@ -167,10 +188,30 @@ public void onSearchIdleReactivation() { compositeListener.onQueryPhase(ctx, timeInNanos.get()); assertEquals(0, preFetch.get()); assertEquals(0, preQuery.get()); + assertEquals(0, preSlice.get()); + assertEquals(0, failedFetch.get()); + assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); + assertEquals(2, onQuery.get()); + assertEquals(0, onFetch.get()); + assertEquals(0, onSlice.get()); + assertEquals(0, newContext.get()); + assertEquals(0, newScrollContext.get()); + assertEquals(0, freeContext.get()); + assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleReactivateCount.get()); + assertEquals(0, validateSearchContext.get()); + + compositeListener.onSliceExecution(ctx); + assertEquals(0, preFetch.get()); + assertEquals(0, preQuery.get()); + assertEquals(0, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(0, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -181,10 +222,13 @@ public void onSearchIdleReactivation() { compositeListener.onFetchPhase(ctx, timeInNanos.get()); assertEquals(0, preFetch.get()); assertEquals(0, preQuery.get()); + assertEquals(0, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -195,10 +239,30 @@ public void onSearchIdleReactivation() { compositeListener.onPreQueryPhase(ctx); assertEquals(0, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(0, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); + assertEquals(0, newContext.get()); + assertEquals(0, newScrollContext.get()); + assertEquals(0, freeContext.get()); + assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleReactivateCount.get()); + assertEquals(0, validateSearchContext.get()); + + compositeListener.onPreSliceExecution(ctx); + assertEquals(0, preFetch.get()); + assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); + assertEquals(0, failedFetch.get()); + assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); + assertEquals(2, onQuery.get()); + assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -209,10 +273,13 @@ public void onSearchIdleReactivation() { compositeListener.onPreFetchPhase(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(0, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -223,10 +290,13 @@ public void onSearchIdleReactivation() { compositeListener.onFailedFetchPhase(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(0, failedQuery.get()); + assertEquals(0, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -237,10 +307,30 @@ public void onSearchIdleReactivation() { compositeListener.onFailedQueryPhase(ctx); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); + assertEquals(2, failedFetch.get()); + assertEquals(2, failedQuery.get()); + assertEquals(0, failedSlice.get()); + assertEquals(2, onQuery.get()); + assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); + assertEquals(0, newContext.get()); + assertEquals(0, newScrollContext.get()); + assertEquals(0, freeContext.get()); + assertEquals(0, freeScrollContext.get()); + assertEquals(0, searchIdleReactivateCount.get()); + assertEquals(0, validateSearchContext.get()); + + compositeListener.onFailedSliceExecution(ctx); + assertEquals(2, preFetch.get()); + assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(0, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -251,10 +341,13 @@ public void onSearchIdleReactivation() { compositeListener.onNewReaderContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(0, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -265,10 +358,13 @@ public void onSearchIdleReactivation() { compositeListener.onNewScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(0, freeContext.get()); @@ -279,10 +375,13 @@ public void onSearchIdleReactivation() { compositeListener.onFreeReaderContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); @@ -293,10 +392,13 @@ public void onSearchIdleReactivation() { compositeListener.onFreeScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); @@ -307,10 +409,13 @@ public void onSearchIdleReactivation() { compositeListener.onSearchIdleReactivation(); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); + assertEquals(2, preSlice.get()); assertEquals(2, failedFetch.get()); assertEquals(2, failedQuery.get()); + assertEquals(2, failedSlice.get()); assertEquals(2, onQuery.get()); assertEquals(2, onFetch.get()); + assertEquals(2, onSlice.get()); assertEquals(2, newContext.get()); assertEquals(2, newScrollContext.get()); assertEquals(2, freeContext.get()); diff --git a/server/src/test/java/org/opensearch/search/SearchCancellationTests.java b/server/src/test/java/org/opensearch/search/SearchCancellationTests.java index fce58eecbafb1..266052444da46 100644 --- a/server/src/test/java/org/opensearch/search/SearchCancellationTests.java +++ b/server/src/test/java/org/opensearch/search/SearchCancellationTests.java @@ -51,6 +51,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.tasks.TaskCancelledException; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.test.OpenSearchTestCase; @@ -138,6 +139,9 @@ public void testCancellableCollector() throws IOException { Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); }; IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); ContextIndexSearcher searcher = new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java index a707c8b34e0a4..606c2512a3d58 100644 --- a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java @@ -81,6 +81,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.test.IndexSettingsModule; @@ -262,6 +263,9 @@ public void onRemoval(ShardId shardId, Accountable accountable) { SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); ContextIndexSearcher searcher = new ContextIndexSearcher( filteredReader, diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index 481a224f2ff0e..3a7c711d324c4 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -62,6 +62,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileResult; @@ -128,6 +129,9 @@ public void setUp() throws Exception { SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); searcher = new ContextIndexSearcher( reader, diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java index 4bd4d406e4391..84057ab1a1b15 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java @@ -101,6 +101,7 @@ import org.opensearch.index.search.OpenSearchToParentBlockJoinQuery; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.lucene.queries.MinDocQuery; import org.opensearch.search.DocValueFormat; import org.opensearch.search.collapse.CollapseBuilder; @@ -1225,6 +1226,9 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); when(searchContext.shouldUseConcurrentSearch()).thenReturn(executor != null); if (executor != null) { @@ -1248,6 +1252,9 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); when(searchContext.shouldUseConcurrentSearch()).thenReturn(executor != null); if (executor != null) { diff --git a/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java index 1d545cea67207..a44b654e462f5 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java @@ -63,6 +63,7 @@ import org.opensearch.index.query.SourceFieldMatchQuery; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.lucene.queries.MinDocQuery; import org.opensearch.search.DocValueFormat; import org.opensearch.search.collapse.CollapseBuilder; @@ -1676,6 +1677,9 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, @@ -1693,6 +1697,9 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 544fb100a17bf..4abd7fbea9cff 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -119,6 +119,7 @@ import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; @@ -380,6 +381,9 @@ public boolean shouldCache(Query query) { IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(new ShardId("test", "test", 0)); when(searchContext.indexShard()).thenReturn(indexShard); + SearchOperationListener searchOperationListener = new SearchOperationListener() { + }; + when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener); when(searchContext.aggregations()).thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer)); when(searchContext.query()).thenReturn(query); when(searchContext.bucketCollectorProcessor()).thenReturn(new BucketCollectorProcessor());