From 7536cfc4ce42581409dd368cb0fd12a24eb44ae8 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Tue, 8 Aug 2023 16:34:48 -0700 Subject: [PATCH] Add more collectorResult test and work on the PR comments (#9129) Signed-off-by: Ticheng Lin --- .../aggregation/AggregationProfilerIT.java | 130 +++++++++++++++++- .../AggregationCollectorManager.java | 7 +- .../ConcurrentAggregationProcessor.java | 3 +- .../GlobalAggCollectorManager.java | 7 + ...ggCollectorManagerWithSingleCollector.java | 7 + .../NonGlobalAggCollectorManager.java | 7 + ...ggCollectorManagerWithSingleCollector.java | 7 + .../InternalProfileCollectorManager.java | 24 +++- .../search/query/QueryCollectorContext.java | 41 ++++-- .../AggregationProcessorTests.java | 17 +-- 10 files changed, 221 insertions(+), 29 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java index 9d0c30c5a488f..456aa7a2aa673 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java @@ -44,6 +44,7 @@ import org.opensearch.search.aggregations.metrics.Stats; import org.opensearch.search.profile.ProfileResult; import org.opensearch.search.profile.ProfileShardResult; +import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.QueryProfileShardResult; import org.opensearch.test.OpenSearchIntegTestCase; @@ -67,8 +68,11 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.containsString; @OpenSearchIntegTestCase.SuiteScopeTestCase public class AggregationProfilerIT extends OpenSearchIntegTestCase { @@ -150,6 +154,8 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase { private static final String TAG_FIELD = "tag"; private static final String STRING_FIELD = "string_field"; private final int numDocs = 5; + private static final String REASON_SEARCH_TOP_HITS = "search_top_hits"; + private static final String REASON_AGGREGATION = "aggregation"; @Override protected int numberOfShards() { @@ -217,8 +223,14 @@ public void testSimpleProfile() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); assertThat(breakdown.get(COLLECT), greaterThan(0L)); @@ -265,8 +277,14 @@ public void testMultiLevelProfile() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(COLLECT), greaterThan(0L)); @@ -366,8 +384,14 @@ public void testMultiLevelProfileBreadthFirst() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(COLLECT), greaterThan(0L)); @@ -452,8 +476,14 @@ public void testDiversifiedAggProfile() { if (diversifyAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); @@ -532,8 +562,14 @@ public void testComplexProfile() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); @@ -792,7 +828,6 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { .get(); assertSearchResponse(response); - Global global = response.getAggregations().get("global"); assertThat(global, IsNull.notNullValue()); assertThat(global.getName(), equalTo("global")); @@ -843,8 +878,14 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { if (globalAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet()); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 0); + } } else { assertEquals(BREAKDOWN_KEYS, breakdown.keySet()); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 0); + } } assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); assertThat(breakdown.get(COLLECT), greaterThan(0L)); @@ -852,4 +893,91 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { assertEquals(0, breakdown.get(REDUCE).intValue()); } } + + public void testMultipleAggregationsProfile() { + SearchResponse response = client().prepareSearch("idx") + .setProfile(true) + .addAggregation(histogram("histo_1").field(NUMBER_FIELD).interval(1L)) + .addAggregation(histogram("histo_2").field(NUMBER_FIELD).interval(1L)) + .get(); + assertSearchResponse(response); + Map profileResults = response.getProfileResults(); + assertThat(profileResults, notNullValue()); + assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries)); + for (ProfileShardResult profileShardResult : profileResults.values()) { + assertThat(profileShardResult, notNullValue()); + List queryProfilerResults = profileShardResult.getQueryProfileResults(); + assertThat(queryProfilerResults, notNullValue()); + for (QueryProfileShardResult queryProfilerResult : queryProfilerResults) { + CollectorResult collectorResult = queryProfilerResult.getCollectorResult(); + String reason = collectorResult.getReason(); + assertThat(reason, equalTo("search_multi")); + List children = collectorResult.getProfiledChildren(); + assertThat(children.size(), equalTo(2)); + assertThat(children.get(1).getName(), containsString("[histo_1, histo_2]")); + } + AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults(); + assertThat(aggProfileResults, notNullValue()); + List aggProfileResultsList = aggProfileResults.getProfileResults(); + assertThat(aggProfileResultsList, notNullValue()); + assertThat(aggProfileResultsList.size(), equalTo(2)); + for (ProfileResult histoAggResult : aggProfileResultsList) { + assertThat(histoAggResult, notNullValue()); + assertThat(histoAggResult.getQueryName(), equalTo("NumericHistogramAggregator")); + assertThat(histoAggResult.getLuceneDescription(), containsString("histo_")); + assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0)); + assertThat(histoAggResult.getTime(), greaterThan(0L)); + Map breakdown = histoAggResult.getTimeBreakdown(); + assertThat(breakdown, notNullValue()); + if (histoAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled + assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } + } else { + assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } + } + assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); + assertThat(breakdown.get(COLLECT), greaterThan(0L)); + assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L)); + assertThat(breakdown.get(REDUCE), equalTo(0L)); + Map debug = histoAggResult.getDebugInfo(); + assertThat(debug, notNullValue()); + assertThat(debug.keySet(), equalTo(Set.of(TOTAL_BUCKETS))); + assertThat(((Number) debug.get(TOTAL_BUCKETS)).longValue(), greaterThan(0L)); + } + } + } + + private void assertCollectorResult(QueryProfileShardResult collectorResult, int expectedChildrenCount) { + long nodeTime = collectorResult.getCollectorResult().getTime(); + assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), equalTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getMinSliceTime(), equalTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), equalTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getReduceTime(), equalTo(0L)); + assertThat(collectorResult.getCollectorResult().getSliceCount(), equalTo(1)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount)); + if (expectedChildrenCount == 2) { + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION)); + } + } + + private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardResult collectorResult, int expectedChildrenCount) { + long nodeTime = collectorResult.getCollectorResult().getTime(); + assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), lessThanOrEqualTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getMinSliceTime(), lessThanOrEqualTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), lessThanOrEqualTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getReduceTime(), greaterThan(0L)); + assertThat(collectorResult.getCollectorResult().getSliceCount(), greaterThanOrEqualTo(1)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount)); + if (expectedChildrenCount == 2) { + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION)); + } + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index e83551304e9be..0bb2d1d7ca933 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -17,14 +17,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; /** * Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global * aggregation operators + * + * @opensearch.internal */ -class AggregationCollectorManager implements CollectorManager { +public abstract class AggregationCollectorManager implements CollectorManager { protected final SearchContext context; private final CheckedFunction, IOException> aggProvider; private final String collectorReason; @@ -51,6 +52,8 @@ public String getCollectorReason() { return collectorReason; } + public abstract String getCollectorName(); + @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); diff --git a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java index ed36b030a6785..fbeb583984ac5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java @@ -13,7 +13,6 @@ import org.apache.lucene.search.Query; import org.opensearch.common.lucene.search.Queries; import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.InternalProfileCollectorManager; import org.opensearch.search.profile.query.InternalProfileComponent; import org.opensearch.search.query.QueryPhaseExecutionException; @@ -68,7 +67,7 @@ public void postProcess(SearchContext context) { if (context.getProfilers() != null) { globalCollectorManager = new InternalProfileCollectorManager( globalCollectorManager, - CollectorResult.REASON_AGGREGATION_GLOBAL, + ((AggregationCollectorManager) globalCollectorManager).getCollectorReason(), Collections.emptyList() ); context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager); diff --git a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java index 41e8aba895480..8814cc3c435e1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java @@ -23,10 +23,12 @@ public class GlobalAggCollectorManager extends AggregationCollectorManager { private Collector collector; + private final String collectorName; public GlobalAggCollectorManager(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -48,4 +50,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) ); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java index f126f27c68855..973749c0d5189 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java @@ -26,10 +26,12 @@ public class GlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager { private final Collector collector; + private final String collectorName; public GlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -42,4 +44,9 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO assert collectors.isEmpty() : "Reduce on GlobalAggregationCollectorManagerWithCollector called with non-empty collectors"; return super.reduce(List.of(collector)); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java index 984eefb9b52a4..8b0a1530b5505 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java @@ -23,10 +23,12 @@ public class NonGlobalAggCollectorManager extends AggregationCollectorManager { private Collector collector; + private final String collectorName; public NonGlobalAggCollectorManager(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -48,4 +50,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) ); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java index 433f6b6a05b22..a6eb00f2d70f7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java @@ -26,10 +26,12 @@ public class NonGlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager { private final Collector collector; + private final String collectorName; public NonGlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -42,4 +44,9 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO assert collectors.isEmpty() : "Reduce on NonGlobalAggregationCollectorManagerWithCollector called with non-empty collectors"; return super.reduce(List.of(collector)); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java b/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java index 074738d2491ec..4156e442c9254 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.CollectorManager; +import org.opensearch.search.aggregations.AggregationCollectorManager; import org.opensearch.search.query.EarlyTerminatingListener; import org.opensearch.search.query.ReduceableSearchResult; @@ -38,6 +39,7 @@ public class InternalProfileCollectorManager private long minSliceTime = Long.MAX_VALUE; private long avgSliceTime = 0; private int sliceCount = 0; + private String collectorManagerName; public InternalProfileCollectorManager( CollectorManager manager, @@ -46,9 +48,29 @@ public InternalProfileCollectorManager( ) { this.manager = manager; this.reason = reason; + this.collectorManagerName = deriveCollectorManagerName(manager); this.children = children; } + /** + * Creates a human-friendly representation of the CollectorManager name. + * + * @param manager The CollectorManager to derive a name from + * @return A (hopefully) prettier name + */ + private String deriveCollectorManagerName(CollectorManager manager) { + String name = manager.getClass().getSimpleName(); + if (name.equals("")) { + name = manager.getClass().getEnclosingClass().getSimpleName(); + } + + // Include the user-defined agg name + if (manager instanceof AggregationCollectorManager) { + name += ": [" + ((AggregationCollectorManager) manager).getCollectorName() + "]"; + } + return name; + } + @Override public InternalProfileCollector newCollector() throws IOException { return new InternalProfileCollector(manager.newCollector(), reason, children); @@ -117,7 +139,7 @@ public Collection children() { @Override public String getName() { - return manager.getClass().getSimpleName(); + return collectorManagerName; } @Override diff --git a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java index cb01681a02e1a..10c070ea7dceb 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java +++ b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java @@ -42,6 +42,8 @@ import org.apache.lucene.search.Weight; import org.opensearch.common.lucene.MinimumScoreCollector; import org.opensearch.common.lucene.search.FilteredCollector; +import org.opensearch.search.aggregations.AggregationCollectorManager; +import org.opensearch.search.aggregations.BucketCollector; import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.InternalProfileCollector; import org.opensearch.search.profile.query.InternalProfileCollectorManager; @@ -100,15 +102,11 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i */ protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) throws IOException { final CollectorManager manager = createManager(in); - if (manager instanceof InternalProfileCollectorManager) { - return (InternalProfileCollectorManager) manager; - } else { - return new InternalProfileCollectorManager( - manager, - profilerName, - in != null ? Collections.singletonList(in) : Collections.emptyList() - ); - } + return new InternalProfileCollectorManager( + manager, + profilerName, + in != null ? Collections.singletonList(in) : Collections.emptyList() + ); } /** @@ -203,7 +201,15 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i for (CollectorManager manager : subs) { final Collector collector = manager.newCollector(); - subCollectors.add(new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, Collections.emptyList())); + if (collector instanceof BucketCollector) { + subCollectors.add( + new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, Collections.emptyList()) + ); + } else { + subCollectors.add( + new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_MULTI, Collections.emptyList()) + ); + } } final Collector collector = MultiCollector.wrap(subCollectors); @@ -217,11 +223,16 @@ protected InternalProfileCollectorManager createWithProfiler(InternalProfileColl managers.add(in); children.add(in); for (CollectorManager manager : subs) { - InternalProfileCollectorManager subCollectorManager = new InternalProfileCollectorManager( - manager, - CollectorResult.REASON_AGGREGATION, - Collections.emptyList() - ); + final InternalProfileCollectorManager subCollectorManager; + if (manager instanceof AggregationCollectorManager) { + subCollectorManager = new InternalProfileCollectorManager( + manager, + ((AggregationCollectorManager) manager).getCollectorReason(), + Collections.emptyList() + ); + } else { + subCollectorManager = new InternalProfileCollectorManager(manager, REASON_SEARCH_MULTI, Collections.emptyList()); + } managers.add(subCollectorManager); children.add(subCollectorManager); } diff --git a/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java b/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java index 06973d986fb07..522d69b22c38a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java @@ -15,6 +15,7 @@ import org.mockito.ArgumentMatchers; import org.opensearch.search.aggregations.bucket.global.GlobalAggregator; import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.query.ReduceableSearchResult; import org.opensearch.test.TestSearchContext; @@ -51,18 +52,10 @@ public void testPostProcessWithNonGlobalAggregatorsAndSingleSlice() throws Excep testPostProcessCommon(multipleNonGlobalAggs, 1, 0, 2, false); } - public void testPostProcessWithNonGlobalAggregatorsAndSingleSliceWithProfilers() throws Exception { - testPostProcessCommon(multipleNonGlobalAggs, 1, 0, 2, true); - } - public void testPostProcessWithNonGlobalAggregatorsAndMultipleSlices() throws Exception { testPostProcessCommon(multipleNonGlobalAggs, randomIntBetween(2, 5), 0, 2, false); } - public void testPostProcessWithNonGlobalAggregatorsAndMultipleSlicesWithProfilers() throws Exception { - testPostProcessCommon(multipleNonGlobalAggs, randomIntBetween(2, 5), 0, 2, true); - } - public void testPostProcessGlobalAndNonGlobalAggregators() throws Exception { testPostProcessCommon(globalNonGlobalAggs, randomIntBetween(2, 5), 1, 1, false); } @@ -182,6 +175,14 @@ private void testPostProcessCommon( // for global aggs verify that search.search is called with CollectionManager if (expectedGlobalAggs > 0) { verify(testSearcher, times(1)).search(nullable(Query.class), ArgumentMatchers.>any()); + if (withProfilers) { + // First profiler is from withProfilers() call, second one is from postProcess() call + assertEquals(2, context.getProfilers().getQueryProfilers().size()); + assertEquals( + CollectorResult.REASON_AGGREGATION_GLOBAL, + context.getProfilers().getQueryProfilers().get(1).getCollector().getReason() + ); + } } // after shard level reduce it should have only 1 InternalAggregation instance for each agg in request and internal aggregation // will be equal to sum of expected global and nonglobal aggs