Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for wrapping CollectorManager with profiling during concurrent execution #9129

Merged
merged 2 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085))
- Add attributes to startSpan methods ([#9199](https://github.com/opensearch-project/OpenSearch/pull/9199))
- Add base class for parameterizing the search based tests #9083 ([#9083](https://github.com/opensearch-project/OpenSearch/pull/9083))
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))

### Deprecated

Expand All @@ -147,4 +148,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.aggregations.metrics.Stats;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.QueryProfileShardResult;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -67,8 +68,11 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.containsString;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class AggregationProfilerIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -150,6 +154,8 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase {
private static final String TAG_FIELD = "tag";
private static final String STRING_FIELD = "string_field";
private final int numDocs = 5;
private static final String REASON_SEARCH_TOP_HITS = "search_top_hits";
private static final String REASON_AGGREGATION = "aggregation";
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved

@Override
protected int numberOfShards() {
Expand Down Expand Up @@ -217,8 +223,14 @@ public void testSimpleProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -265,8 +277,14 @@ public void testMultiLevelProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -366,8 +384,14 @@ public void testMultiLevelProfileBreadthFirst() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -452,8 +476,14 @@ public void testDiversifiedAggProfile() {
if (diversifyAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -532,8 +562,14 @@ public void testComplexProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -792,7 +828,6 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
.get();

assertSearchResponse(response);

Global global = response.getAggregations().get("global");
assertThat(global, IsNull.notNullValue());
assertThat(global.getName(), equalTo("global"));
Expand Down Expand Up @@ -843,13 +878,106 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
if (globalAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 0);
}
} else {
assertEquals(BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 0);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertEquals(0, breakdown.get(REDUCE).intValue());
}
}

public void testMultipleAggregationsProfile() {
SearchResponse response = client().prepareSearch("idx")
.setProfile(true)
.addAggregation(histogram("histo_1").field(NUMBER_FIELD).interval(1L))
.addAggregation(histogram("histo_2").field(NUMBER_FIELD).interval(1L))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
List<QueryProfileShardResult> queryProfilerResults = profileShardResult.getQueryProfileResults();
assertThat(queryProfilerResults, notNullValue());
for (QueryProfileShardResult queryProfilerResult : queryProfilerResults) {
CollectorResult collectorResult = queryProfilerResult.getCollectorResult();
String reason = collectorResult.getReason();
assertThat(reason, equalTo("search_multi"));
List<CollectorResult> children = collectorResult.getProfiledChildren();
assertThat(children.size(), equalTo(2));
assertThat(children.get(1).getName(), containsString("[histo_1, histo_2]"));
}
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(2));
for (ProfileResult histoAggResult : aggProfileResultsList) {
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("NumericHistogramAggregator"));
assertThat(histoAggResult.getLuceneDescription(), containsString("histo_"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertThat(breakdown, notNullValue());
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertThat(breakdown.get(REDUCE), equalTo(0L));
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertThat(debug, notNullValue());
assertThat(debug.keySet(), equalTo(Set.of(TOTAL_BUCKETS)));
assertThat(((Number) debug.get(TOTAL_BUCKETS)).longValue(), greaterThan(0L));
}
}
}

private void assertCollectorResult(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), equalTo(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), equalTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}

private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), greaterThan(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), greaterThanOrEqualTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.common.CheckedFunction;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
* aggregation operators
*
* @opensearch.internal
*/
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
public abstract class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
protected final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;
Expand All @@ -42,12 +42,18 @@ class AggregationCollectorManager implements CollectorManager<Collector, Reducea

@Override
public Collector newCollector() throws IOException {
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason);
final Collector collector = createCollector(aggProvider.apply(context));
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
}

public String getCollectorReason() {
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
return collectorReason;
}

public abstract String getCollectorName();

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
Expand All @@ -70,17 +76,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
return new AggregationReduceableSearchResult(internalAggregations);
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
static Collector createCollector(List<Aggregator> collectors) throws IOException {
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector) collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(
collector,
reason,
// TODO: report on child aggs as well
Collections.emptyList()
);
}
return collector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.InternalProfileCollectorManager;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.query.QueryPhaseExecutionException;
Expand Down Expand Up @@ -65,12 +64,12 @@ public void postProcess(SearchContext context) {
try {
if (globalCollectorManager != null) {
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()
);
if (context.getProfilers() != null) {
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
((AggregationCollectorManager) globalCollectorManager).getCollectorReason(),
Collections.emptyList()
);
context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager);
}
final ReduceableSearchResult result = context.searcher().search(query, globalCollectorManager);
Expand Down
Loading