Skip to content

Commit

Permalink
Add more collectorResult test and work on the PR comments (#9129)
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Aug 14, 2023
1 parent e8352de commit 7536cfc
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.aggregations.metrics.Stats;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.QueryProfileShardResult;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -67,8 +68,11 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.containsString;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class AggregationProfilerIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -150,6 +154,8 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase {
private static final String TAG_FIELD = "tag";
private static final String STRING_FIELD = "string_field";
private final int numDocs = 5;
private static final String REASON_SEARCH_TOP_HITS = "search_top_hits";
private static final String REASON_AGGREGATION = "aggregation";

@Override
protected int numberOfShards() {
Expand Down Expand Up @@ -217,8 +223,14 @@ public void testSimpleProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -265,8 +277,14 @@ public void testMultiLevelProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -366,8 +384,14 @@ public void testMultiLevelProfileBreadthFirst() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -452,8 +476,14 @@ public void testDiversifiedAggProfile() {
if (diversifyAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -532,8 +562,14 @@ public void testComplexProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -792,7 +828,6 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
.get();

assertSearchResponse(response);

Global global = response.getAggregations().get("global");
assertThat(global, IsNull.notNullValue());
assertThat(global.getName(), equalTo("global"));
Expand Down Expand Up @@ -843,13 +878,106 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
if (globalAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 0);
}
} else {
assertEquals(BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 0);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertEquals(0, breakdown.get(REDUCE).intValue());
}
}

public void testMultipleAggregationsProfile() {
SearchResponse response = client().prepareSearch("idx")
.setProfile(true)
.addAggregation(histogram("histo_1").field(NUMBER_FIELD).interval(1L))
.addAggregation(histogram("histo_2").field(NUMBER_FIELD).interval(1L))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
List<QueryProfileShardResult> queryProfilerResults = profileShardResult.getQueryProfileResults();
assertThat(queryProfilerResults, notNullValue());
for (QueryProfileShardResult queryProfilerResult : queryProfilerResults) {
CollectorResult collectorResult = queryProfilerResult.getCollectorResult();
String reason = collectorResult.getReason();
assertThat(reason, equalTo("search_multi"));
List<CollectorResult> children = collectorResult.getProfiledChildren();
assertThat(children.size(), equalTo(2));
assertThat(children.get(1).getName(), containsString("[histo_1, histo_2]"));
}
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(2));
for (ProfileResult histoAggResult : aggProfileResultsList) {
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("NumericHistogramAggregator"));
assertThat(histoAggResult.getLuceneDescription(), containsString("histo_"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertThat(breakdown, notNullValue());
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertThat(breakdown.get(REDUCE), equalTo(0L));
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertThat(debug, notNullValue());
assertThat(debug.keySet(), equalTo(Set.of(TOTAL_BUCKETS)));
assertThat(((Number) debug.get(TOTAL_BUCKETS)).longValue(), greaterThan(0L));
}
}
}

private void assertCollectorResult(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), equalTo(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), equalTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}

private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), greaterThan(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), greaterThanOrEqualTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
* aggregation operators
*
* @opensearch.internal
*/
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
public abstract class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
protected final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;
Expand All @@ -51,6 +52,8 @@ public String getCollectorReason() {
return collectorReason;
}

public abstract String getCollectorName();

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.InternalProfileCollectorManager;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.query.QueryPhaseExecutionException;
Expand Down Expand Up @@ -68,7 +67,7 @@ public void postProcess(SearchContext context) {
if (context.getProfilers() != null) {
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
((AggregationCollectorManager) globalCollectorManager).getCollectorReason(),
Collections.emptyList()
);
context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
public class GlobalAggCollectorManager extends AggregationCollectorManager {

private Collector collector;
private final String collectorName;

public GlobalAggCollectorManager(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -48,4 +50,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}

@Override
public String getCollectorName() {
return collectorName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
public class GlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager {

private final Collector collector;
private final String collectorName;

public GlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -42,4 +44,9 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
assert collectors.isEmpty() : "Reduce on GlobalAggregationCollectorManagerWithCollector called with non-empty collectors";
return super.reduce(List.of(collector));
}

@Override
public String getCollectorName() {
return collectorName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
public class NonGlobalAggCollectorManager extends AggregationCollectorManager {

private Collector collector;
private final String collectorName;

public NonGlobalAggCollectorManager(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -48,4 +50,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}

@Override
public String getCollectorName() {
return collectorName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
public class NonGlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager {

private final Collector collector;
private final String collectorName;

public NonGlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -42,4 +44,9 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
assert collectors.isEmpty() : "Reduce on NonGlobalAggregationCollectorManagerWithCollector called with non-empty collectors";
return super.reduce(List.of(collector));
}

@Override
public String getCollectorName() {
return collectorName;
}
}
Loading

0 comments on commit 7536cfc

Please sign in to comment.