Skip to content

Commit

Permalink
Add support for wrapping CollectorManager with profiling during concu…
Browse files Browse the repository at this point in the history
…rrent execution (opensearch-project#9129)

* Add support for wrapping CollectorManager with profiling during concurrent execution (opensearch-project#9129)

Signed-off-by: Ticheng Lin <[email protected]>

* Add more collectorResult test and work on the PR comments (opensearch-project#9129)

Signed-off-by: Ticheng Lin <[email protected]>

---------

Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Aug 15, 2023
1 parent c687116 commit c6582cb
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085))
- Add attributes to startSpan methods ([#9199](https://github.com/opensearch-project/OpenSearch/pull/9199))
- [Refactor] Task foundation classes to core library - pt 1 ([#9082](https://github.com/opensearch-project/OpenSearch/pull/9082))
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.aggregations.metrics.Stats;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.QueryProfileShardResult;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -67,8 +68,11 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.containsString;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class AggregationProfilerIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -150,6 +154,8 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase {
private static final String TAG_FIELD = "tag";
private static final String STRING_FIELD = "string_field";
private final int numDocs = 5;
private static final String REASON_SEARCH_TOP_HITS = "search_top_hits";
private static final String REASON_AGGREGATION = "aggregation";

@Override
protected int numberOfShards() {
Expand Down Expand Up @@ -217,8 +223,14 @@ public void testSimpleProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -265,8 +277,14 @@ public void testMultiLevelProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -366,8 +384,14 @@ public void testMultiLevelProfileBreadthFirst() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -452,8 +476,14 @@ public void testDiversifiedAggProfile() {
if (diversifyAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -532,8 +562,14 @@ public void testComplexProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -792,7 +828,6 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
.get();

assertSearchResponse(response);

Global global = response.getAggregations().get("global");
assertThat(global, IsNull.notNullValue());
assertThat(global.getName(), equalTo("global"));
Expand Down Expand Up @@ -843,13 +878,106 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
if (globalAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 0);
}
} else {
assertEquals(BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 0);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertEquals(0, breakdown.get(REDUCE).intValue());
}
}

public void testMultipleAggregationsProfile() {
SearchResponse response = client().prepareSearch("idx")
.setProfile(true)
.addAggregation(histogram("histo_1").field(NUMBER_FIELD).interval(1L))
.addAggregation(histogram("histo_2").field(NUMBER_FIELD).interval(1L))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
List<QueryProfileShardResult> queryProfilerResults = profileShardResult.getQueryProfileResults();
assertThat(queryProfilerResults, notNullValue());
for (QueryProfileShardResult queryProfilerResult : queryProfilerResults) {
CollectorResult collectorResult = queryProfilerResult.getCollectorResult();
String reason = collectorResult.getReason();
assertThat(reason, equalTo("search_multi"));
List<CollectorResult> children = collectorResult.getProfiledChildren();
assertThat(children.size(), equalTo(2));
assertThat(children.get(1).getName(), containsString("[histo_1, histo_2]"));
}
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(2));
for (ProfileResult histoAggResult : aggProfileResultsList) {
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("NumericHistogramAggregator"));
assertThat(histoAggResult.getLuceneDescription(), containsString("histo_"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertThat(breakdown, notNullValue());
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertThat(breakdown.get(REDUCE), equalTo(0L));
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertThat(debug, notNullValue());
assertThat(debug.keySet(), equalTo(Set.of(TOTAL_BUCKETS)));
assertThat(((Number) debug.get(TOTAL_BUCKETS)).longValue(), greaterThan(0L));
}
}
}

private void assertCollectorResult(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), equalTo(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), equalTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}

private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), greaterThan(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), greaterThanOrEqualTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
import org.apache.lucene.search.CollectorManager;
import org.opensearch.common.CheckedFunction;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
* aggregation operators
*
* @opensearch.internal
*/
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
public abstract class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
protected final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;
Expand All @@ -42,12 +42,18 @@ class AggregationCollectorManager implements CollectorManager<Collector, Reducea

@Override
public Collector newCollector() throws IOException {
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason);
final Collector collector = createCollector(aggProvider.apply(context));
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
}

public String getCollectorReason() {
return collectorReason;
}

public abstract String getCollectorName();

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
Expand Down Expand Up @@ -75,17 +81,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
return new AggregationReduceableSearchResult(internalAggregations);
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
static Collector createCollector(List<Aggregator> collectors) throws IOException {
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector) collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(
collector,
reason,
// TODO: report on child aggs as well
Collections.emptyList()
);
}
return collector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.InternalProfileCollectorManager;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.query.QueryPhaseExecutionException;
Expand Down Expand Up @@ -65,12 +64,12 @@ public void postProcess(SearchContext context) {
try {
if (globalCollectorManager != null) {
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
Collections.emptyList()
);
if (context.getProfilers() != null) {
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
((AggregationCollectorManager) globalCollectorManager).getCollectorReason(),
Collections.emptyList()
);
context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager);
}
final ReduceableSearchResult result = context.searcher().search(query, globalCollectorManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.QueryPhaseExecutionException;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -74,7 +75,13 @@ public void postProcess(SearchContext context) {
if (context.getProfilers() != null) {
context.getProfilers()
.addQueryProfiler()
.setCollector((InternalProfileComponent) globalCollectorManager.newCollector());
.setCollector(
new InternalProfileCollector(
globalCollectorManager.newCollector(),
globalCollectorManager.getCollectorReason(),
Collections.emptyList()
)
);
}
context.searcher().search(query, globalCollectorManager.newCollector());
globalCollectorManager.reduce(List.of()).reduce(context.queryResult());
Expand Down
Loading

0 comments on commit c6582cb

Please sign in to comment.