Skip to content

Commit

Permalink
Fix timer race condition in profile rewrite and create weight for con…
Browse files Browse the repository at this point in the history
…current segment search (opensearch-project#10352)

Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Oct 18, 2023
1 parent 3a36c22 commit 6651d5f
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.profile.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.English;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
Expand All @@ -40,29 +42,56 @@
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase {
private final boolean concurrentSearchEnabled;
private static final String MAX_PREFIX = "max_";
private static final String MIN_PREFIX = "min_";
private static final String AVG_PREFIX = "avg_";
private static final String TIMING_TYPE_COUNT_SUFFIX = "_count";

public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) {
super(settings);
this.concurrentSearchEnabled = concurrentSearchEnabled;
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true }
);
}

public class QueryProfilerIT extends OpenSearchIntegTestCase {
@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

/**
* This test simply checks to make sure nothing crashes. Test indexes 100-150 documents,
Expand Down Expand Up @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception {
assertEquals(result.getLuceneDescription(), "field1:one");
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -271,6 +301,7 @@ public void testBool() throws Exception {
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertEquals(result.getProfiledChildren().size(), 2);
assertQueryProfileResult(result);

// Check the children
List<ProfileResult> children = result.getProfiledChildren();
Expand All @@ -282,12 +313,14 @@ public void testBool() throws Exception {
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertEquals(childProfile.getProfiledChildren().size(), 0);
assertQueryProfileResult(childProfile);

childProfile = children.get(1);
assertEquals(childProfile.getQueryName(), "TermQuery");
assertEquals(childProfile.getLuceneDescription(), "field1:two");
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertQueryProfileResult(childProfile);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -415,6 +450,7 @@ public void testBoosting() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -455,6 +491,7 @@ public void testDisMaxRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -494,6 +531,7 @@ public void testRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -547,6 +585,7 @@ public void testPhrase() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -579,4 +618,35 @@ public void testNoProfile() throws Exception {
assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0));
}

private void assertQueryProfileResult(ProfileResult result) {
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private QueryProfiler profiler;
private MutableQueryTimeout cancellable;
private SearchContext searchContext;
private boolean fromConcurrentPath = false;

public ContextIndexSearcher(
IndexReader reader,
Expand Down Expand Up @@ -185,15 +187,26 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {

@Override
public Query rewrite(Query original) throws IOException {
Timer concurrentPathRewriteTimer = null;
if (profiler != null) {
profiler.startRewriteTime();
if (fromConcurrentPath) {
concurrentPathRewriteTimer = new Timer();
profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer);
concurrentPathRewriteTimer.start();
} else {
profiler.startRewriteTime();
}
}

try {
return super.rewrite(original);
} finally {
if (profiler != null) {
profiler.stopAndAddRewriteTime();
if (fromConcurrentPath) {
concurrentPathRewriteTimer.stop();
} else {
profiler.stopAndAddRewriteTime();
}
}
}
}
Expand All @@ -204,7 +217,15 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
// createWeight() is called for each query in the tree, so we tell the queryProfiler
// each invocation so that it can build an internal representation of the query
// tree
ContextualProfileBreakdown<QueryTimingType> profile = profiler.getQueryBreakdown(query);
ContextualProfileBreakdown<QueryTimingType> profile;
if (searchContext.shouldUseConcurrentSearch()) {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree()
.computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree());
profile = profileTree.getProfileBreakdown(query);
} else {
profile = profiler.getQueryBreakdown(query);
}
Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT);
timer.start();
final Weight weight;
Expand Down Expand Up @@ -267,6 +288,9 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
if (searchContext.shouldUseConcurrentSearch()) {
fromConcurrentPath = true;
}
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@

package org.opensearch.search.profile;

import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Base class for a profiler
Expand All @@ -42,6 +46,7 @@
public class AbstractProfiler<PB extends AbstractProfileBreakdown<?>, E> {

protected final AbstractInternalProfileTree<PB, E> profileTree;
protected Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;

public AbstractProfiler(AbstractInternalProfileTree<PB, E> profileTree) {
this.profileTree = profileTree;
Expand All @@ -59,14 +64,27 @@ public PB getQueryBreakdown(E query) {
* Removes the last (e.g. most recent) element on the stack.
*/
public void pollLastElement() {
profileTree.pollLast();
if (threadToProfileTree == null) {
profileTree.pollLast();
} else {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId);
concurrentProfileTree.pollLast();
}
}

/**
* @return a hierarchical representation of the profiled tree
*/
public List<ProfileResult> getTree() {
return profileTree.getTree();
if (threadToProfileTree == null) {
return profileTree.getTree();
}
List<ProfileResult> profileResults = new ArrayList<>();
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
profileResults.addAll(profile.getValue().getTree());
}
return profileResults;
}

}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ public class Timer {
private boolean doTiming;
private long timing, count, lastCount, start, earliestTimerStartTime;

public Timer() {
this(0, 0, 0, 0, 0);
}

public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) {
this.timing = timing;
this.count = count;
this.lastCount = lastCount;
this.start = start;
this.earliestTimerStartTime = earliestTimerStartTime;
}

/** pkg-private for testing */
long nanoTime() {
return System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import org.apache.lucene.search.Collector;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.Timer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -22,6 +24,7 @@
* @opensearch.internal
*/
public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree {
protected List<Timer> concurrentPathRewriteTimers = new ArrayList<>();

@Override
protected ContextualProfileBreakdown<QueryTimingType> createProfileBreakdown() {
Expand Down Expand Up @@ -88,4 +91,11 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map<
}
}
}

/**
* @return the concurrent path rewrite timer list for this profile tree
*/
public List<Timer> getConcurrentPathRewriteTimers() {
return concurrentPathRewriteTimers;
}
}
Loading

0 comments on commit 6651d5f

Please sign in to comment.