Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight #10352

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.profile.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.English;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
Expand All @@ -40,29 +42,56 @@
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase {
private final boolean concurrentSearchEnabled;
private static final String MAX_PREFIX = "max_";
private static final String MIN_PREFIX = "min_";
private static final String AVG_PREFIX = "avg_";
private static final String TIMING_TYPE_COUNT_SUFFIX = "_count";

public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) {
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
super(settings);
this.concurrentSearchEnabled = concurrentSearchEnabled;
}

public class QueryProfilerIT extends OpenSearchIntegTestCase {
@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true }
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

/**
* This test simply checks to make sure nothing crashes. Test indexes 100-150 documents,
Expand Down Expand Up @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception {
assertEquals(result.getLuceneDescription(), "field1:one");
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -271,6 +301,7 @@ public void testBool() throws Exception {
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertEquals(result.getProfiledChildren().size(), 2);
assertQueryProfileResult(result);

// Check the children
List<ProfileResult> children = result.getProfiledChildren();
Expand All @@ -282,12 +313,14 @@ public void testBool() throws Exception {
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertEquals(childProfile.getProfiledChildren().size(), 0);
assertQueryProfileResult(childProfile);

childProfile = children.get(1);
assertEquals(childProfile.getQueryName(), "TermQuery");
assertEquals(childProfile.getLuceneDescription(), "field1:two");
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertQueryProfileResult(childProfile);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -415,6 +450,90 @@ public void testBoosting() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
assertThat(result.getName(), is(not(emptyOrNullString())));
assertThat(result.getTime(), greaterThan(0L));
}
}
}

public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception {
createIndex("test");
ensureGreen();

int numDocs = 122;
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
}

List<String> terms = Arrays.asList("zero", "zero", "one");

indexRandom(true, docs);

refresh();

QueryBuilder q = QueryBuilders.boostingQuery(
QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())),
QueryBuilders.termsQuery("field1", terms)
).boost(randomFloat()).negativeBoost(randomFloat());
logger.info("Query: {}", q);

SearchResponse resp = client().prepareSearch()
.setQuery(q)
.setTrackTotalHits(true)
.setProfile(true)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.get();

assertNotNull("Profile response element should not be null", resp.getProfileResults());
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));

for (Map.Entry<String, ProfileShardResult> shardResult : resp.getProfileResults().entrySet()) {
assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) {
List<ProfileResult> results = searchProfiles.getQueryResults();
for (ProfileResult result : results) {
assertNotNull(result.getQueryName());
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled && results.get(0).equals(result)) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else if (concurrentSearchEnabled) {
assertThat(maxSliceTime, equalTo(0L));
assertThat(minSliceTime, equalTo(0L));
assertThat(avgSliceTime, equalTo(0L));
assertThat(breakdown.size(), equalTo(27));
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -455,6 +574,7 @@ public void testDisMaxRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -494,6 +614,7 @@ public void testRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -547,6 +668,7 @@ public void testPhrase() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -579,4 +701,35 @@ public void testNoProfile() throws Exception {
assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0));
}

private void assertQueryProfileResult(ProfileResult result) {
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
import org.opensearch.search.profile.query.InternalQueryProfileTree;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand Down Expand Up @@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree())
: new QueryProfiler(new InternalQueryProfileTree());
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ public class Timer {
private boolean doTiming;
private long timing, count, lastCount, start, earliestTimerStartTime;

public Timer() {
this(0, 0, 0, 0, 0);
}

public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) {
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
this.timing = timing;
this.count = count;
this.lastCount = lastCount;
this.start = start;
this.earliestTimerStartTime = earliestTimerStartTime;
}

/** pkg-private for testing */
long nanoTime() {
return System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,11 @@ public void startRewriteTime() {
* startRewriteTime() must be called for a particular context prior to calling
* stopAndAddRewriteTime(), otherwise the elapsed time will be negative and
* nonsensical
*
* @return The elapsed time
*/
public long stopAndAddRewriteTime() {
public void stopAndAddRewriteTime() {
long time = Math.max(1, System.nanoTime() - rewriteScratch);
rewriteTime += time;
rewriteScratch = 0;
return time;
}

public long getRewriteTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,29 @@ public Map<String, Long> toBreakdownMap() {
);
final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString());

if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) {
if (contexts.isEmpty()) {
// If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the
// create_weight time/count
queryNodeTime = createWeightTime;
maxSliceNodeTime = 0L;
minSliceNodeTime = 0L;
avgSliceNodeTime = 0L;
return buildDefaultQueryBreakdownMap(createWeightTime);
} else if (sliceCollectorsToLeaves.isEmpty()) {
// This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It
// creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for
// the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later
// in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no
// concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination.
AbstractProfileBreakdown<QueryTimingType> breakdown = contexts.values().iterator().next();
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
queryNodeTime = breakdown.toNodeTime() + createWeightTime;
maxSliceNodeTime = 0L;
minSliceNodeTime = 0L;
avgSliceNodeTime = 0L;
Map<String, Long> queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap());
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime);
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L);
return queryBreakdownMap;
}

// first create the slice level breakdowns
Expand Down Expand Up @@ -191,10 +206,12 @@ Map<Collector, Map<String, Long>> buildSliceLevelBreakdown() {
}
// compute sliceMaxEndTime as max of sliceEndTime across all timing types
sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE));
sliceMinStartTime = Math.min(
sliceMinStartTime,
currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE)
);
long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE);
if (currentSliceStartTime == 0L) {
// The timer for the current timing type never starts, so we continue here
continue;
}
sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime);
// compute total time for each timing type at slice level using sliceEndTime and sliceStartTime
currentSliceBreakdown.put(
timingType.toString(),
Expand Down
Loading
Loading