Skip to content

Commit

Permalink
Fix profile rewrite time issue in the concurrent path
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Oct 9, 2023
1 parent 8bb11a6 commit 140060f
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix circular dependency in Settings initialization ([10194](https://github.com/opensearch-project/OpenSearch/pull/10194))
- Fix registration and initialization of multiple extensions ([10256](https://github.com/opensearch-project/OpenSearch/pull/10256))
- Fix Segment Replication ShardLockObtainFailedException bug during index corruption ([10370](https://github.com/opensearch-project/OpenSearch/pull/10370))
- Fix profile rewrite time issue in the concurrent path ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.profile.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.English;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
Expand All @@ -40,29 +42,56 @@
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase {
private final boolean concurrentSearchEnabled;
private static final String MAX_PREFIX = "max_";
private static final String MIN_PREFIX = "min_";
private static final String AVG_PREFIX = "avg_";
private static final String TIMING_TYPE_COUNT_SUFFIX = "_count";

public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) {
super(settings);
this.concurrentSearchEnabled = concurrentSearchEnabled;
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true }
);
}

public class QueryProfilerIT extends OpenSearchIntegTestCase {
@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

/**
* This test simply checks to make sure nothing crashes. Test indexes 100-150 documents,
Expand Down Expand Up @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception {
assertEquals(result.getLuceneDescription(), "field1:one");
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -271,6 +301,7 @@ public void testBool() throws Exception {
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertEquals(result.getProfiledChildren().size(), 2);
assertQueryProfileResult(result);

// Check the children
List<ProfileResult> children = result.getProfiledChildren();
Expand All @@ -282,12 +313,14 @@ public void testBool() throws Exception {
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertEquals(childProfile.getProfiledChildren().size(), 0);
assertQueryProfileResult(childProfile);

childProfile = children.get(1);
assertEquals(childProfile.getQueryName(), "TermQuery");
assertEquals(childProfile.getLuceneDescription(), "field1:two");
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertQueryProfileResult(childProfile);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -415,6 +450,7 @@ public void testBoosting() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -455,6 +491,7 @@ public void testDisMaxRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -494,6 +531,7 @@ public void testRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -547,6 +585,7 @@ public void testPhrase() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -579,4 +618,35 @@ public void testNoProfile() throws Exception {
assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0));
}

private void assertQueryProfileResult(ProfileResult result) {
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private QueryProfiler profiler;
private MutableQueryTimeout cancellable;
private SearchContext searchContext;
private boolean fromConcurrentPath = false;

public ContextIndexSearcher(
IndexReader reader,
Expand Down Expand Up @@ -185,15 +187,26 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {

@Override
public Query rewrite(Query original) throws IOException {
Timer concurrentPathRewriteTimer = null;
long threadId = Thread.currentThread().getId();
if (profiler != null) {
profiler.startRewriteTime();
if (fromConcurrentPath) {
concurrentPathRewriteTimer = profiler.getThreadToRewriteTimer().computeIfAbsent(threadId, k -> new Timer());
concurrentPathRewriteTimer.start();
} else {
profiler.startRewriteTime();
}
}

try {
return super.rewrite(original);
} finally {
if (profiler != null) {
profiler.stopAndAddRewriteTime();
if (fromConcurrentPath) {
concurrentPathRewriteTimer.stop();
} else {
profiler.stopAndAddRewriteTime();
}
}
}
}
Expand All @@ -204,7 +217,15 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
// createWeight() is called for each query in the tree, so we tell the queryProfiler
// each invocation so that it can build an internal representation of the query
// tree
ContextualProfileBreakdown<QueryTimingType> profile = profiler.getQueryBreakdown(query);
ContextualProfileBreakdown<QueryTimingType> profile;
if (searchContext.shouldUseConcurrentSearch()) {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree()
.computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree());
profile = profileTree.getProfileBreakdown(query);
} else {
profile = profiler.getQueryBreakdown(query);
}
Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT);
timer.start();
final Weight weight;
Expand Down Expand Up @@ -271,6 +292,10 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseConcurrentSearch()) {
fromConcurrentPath = true;
}

if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@

package org.opensearch.search.profile;

import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Base class for a profiler
Expand All @@ -42,6 +46,7 @@
public class AbstractProfiler<PB extends AbstractProfileBreakdown<?>, E> {

protected final AbstractInternalProfileTree<PB, E> profileTree;
protected Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;

public AbstractProfiler(AbstractInternalProfileTree<PB, E> profileTree) {
this.profileTree = profileTree;
Expand All @@ -59,14 +64,27 @@ public PB getQueryBreakdown(E query) {
* Removes the last (e.g. most recent) element on the stack.
*/
public void pollLastElement() {
profileTree.pollLast();
if (threadToProfileTree == null) {
profileTree.pollLast();
} else {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId);
concurrentProfileTree.pollLast();
}
}

/**
* @return a hierarchical representation of the profiled tree
*/
public List<ProfileResult> getTree() {
return profileTree.getTree();
if (threadToProfileTree == null) {
return profileTree.getTree();
}
List<ProfileResult> profileResults = new ArrayList<>();
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
profileResults.addAll(profile.getValue().getTree());
}
return profileResults;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ public ContextualProfileBreakdown(Class<T> clazz) {
public void associateCollectorToLeaves(Collector collector, LeafReaderContext leaf) {}

public void associateCollectorsToLeaves(Map<Collector, List<LeafReaderContext>> collectorToLeaves) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@
import org.apache.lucene.search.Collector;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.Timer;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class returns a list of {@link ProfileResult} that can be serialized back to the client in the concurrent execution.
*
* @opensearch.internal
*/
public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree {
private Map<Long, Timer> threadToRewriteTimer = new ConcurrentHashMap<>();

@Override
protected ContextualProfileBreakdown<QueryTimingType> createProfileBreakdown() {
threadToRewriteTimer = new ConcurrentHashMap<>();
return new ConcurrentQueryProfileBreakdown();
}

Expand Down Expand Up @@ -88,4 +92,11 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map<
}
}
}

/**
* @return the thread to rewrite timer map for this profile tree
*/
public Map<Long, Timer> getThreadToRewriteTimer() {
return threadToRewriteTimer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,5 @@ public boolean isCacheable(LeafReaderContext ctx) {
public void associateCollectorToLeaves(LeafReaderContext leaf, Collector collector) {
profile.associateCollectorToLeaves(collector, leaf);
}

}
Loading

0 comments on commit 140060f

Please sign in to comment.