Skip to content

Commit

Permalink
Refactor and work on the PR comments (opensearch-project#10352)
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Oct 19, 2023
1 parent f5de511 commit 8c2ad21
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,89 @@ public void testBoosting() throws Exception {
}
}

public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception {
createIndex("test");
ensureGreen();

int numDocs = 122;
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
}

List<String> terms = Arrays.asList("zero", "zero", "one");

indexRandom(true, docs);

refresh();

QueryBuilder q = QueryBuilders.boostingQuery(
QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())),
QueryBuilders.termsQuery("field1", terms)
).boost(randomFloat()).negativeBoost(randomFloat());
logger.info("Query: {}", q);

SearchResponse resp = client().prepareSearch()
.setQuery(q)
.setTrackTotalHits(true)
.setProfile(true)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.get();

assertNotNull("Profile response element should not be null", resp.getProfileResults());
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));

for (Map.Entry<String, ProfileShardResult> shardResult : resp.getProfileResults().entrySet()) {
assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) {
List<ProfileResult> results = searchProfiles.getQueryResults();
for (ProfileResult result : results) {
assertNotNull(result.getQueryName());
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled && results.get(0).equals(result)) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else if (concurrentSearchEnabled) {
assertThat(maxSliceTime, equalTo(0L));
assertThat(minSliceTime, equalTo(0L));
assertThat(avgSliceTime, equalTo(0L));
assertThat(breakdown.size(), equalTo(27));
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

CollectorResult result = searchProfiles.getCollectorResult();
assertThat(result.getName(), is(not(emptyOrNullString())));
assertThat(result.getTime(), greaterThan(0L));
}
}
}

public void testDisMaxRange() throws Exception {
createIndex("test");
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
Expand Down Expand Up @@ -107,7 +106,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private QueryProfiler profiler;
private MutableQueryTimeout cancellable;
private SearchContext searchContext;
private boolean fromConcurrentPath = false;

public ContextIndexSearcher(
IndexReader reader,
Expand Down Expand Up @@ -187,26 +185,15 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {

@Override
public Query rewrite(Query original) throws IOException {
Timer concurrentPathRewriteTimer = null;
if (profiler != null) {
if (fromConcurrentPath) {
concurrentPathRewriteTimer = new Timer();
profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer);
concurrentPathRewriteTimer.start();
} else {
profiler.startRewriteTime();
}
profiler.startRewriteTime();
}

try {
return super.rewrite(original);
} finally {
if (profiler != null) {
if (fromConcurrentPath) {
concurrentPathRewriteTimer.stop();
} else {
profiler.stopAndAddRewriteTime();
}
profiler.stopAndAddRewriteTime();
}
}
}
Expand All @@ -217,15 +204,7 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
// createWeight() is called for each query in the tree, so we tell the queryProfiler
// each invocation so that it can build an internal representation of the query
// tree
ContextualProfileBreakdown<QueryTimingType> profile;
if (searchContext.shouldUseConcurrentSearch()) {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree()
.computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree());
profile = profileTree.getProfileBreakdown(query);
} else {
profile = profiler.getQueryBreakdown(query);
}
ContextualProfileBreakdown<QueryTimingType> profile = profiler.getQueryBreakdown(query);
Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT);
timer.start();
final Weight weight;
Expand Down Expand Up @@ -288,9 +267,6 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
if (searchContext.shouldUseConcurrentSearch()) {
fromConcurrentPath = true;
}
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@

package org.opensearch.search.profile;

import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Base class for a profiler
Expand All @@ -46,7 +42,6 @@
public class AbstractProfiler<PB extends AbstractProfileBreakdown<?>, E> {

protected final AbstractInternalProfileTree<PB, E> profileTree;
protected Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;

public AbstractProfiler(AbstractInternalProfileTree<PB, E> profileTree) {
this.profileTree = profileTree;
Expand All @@ -64,27 +59,14 @@ public PB getQueryBreakdown(E query) {
* Removes the last (e.g. most recent) element on the stack.
*/
public void pollLastElement() {
if (threadToProfileTree == null) {
profileTree.pollLast();
} else {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId);
concurrentProfileTree.pollLast();
}
profileTree.pollLast();
}

/**
* @return a hierarchical representation of the profiled tree
*/
public List<ProfileResult> getTree() {
if (threadToProfileTree == null) {
return profileTree.getTree();
}
List<ProfileResult> profileResults = new ArrayList<>();
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
profileResults.addAll(profile.getValue().getTree());
}
return profileResults;
return profileTree.getTree();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
import org.opensearch.search.profile.query.InternalQueryProfileTree;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand Down Expand Up @@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree())
: new QueryProfiler(new InternalQueryProfileTree());
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,29 @@ public Map<String, Long> toBreakdownMap() {
);
final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString());

if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) {
if (contexts.isEmpty()) {
// If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the
// create_weight time/count
queryNodeTime = createWeightTime;
maxSliceNodeTime = 0L;
minSliceNodeTime = 0L;
avgSliceNodeTime = 0L;
return buildDefaultQueryBreakdownMap(createWeightTime);
} else if (sliceCollectorsToLeaves.isEmpty()) {
// This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It
// creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for
// the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later
// in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no
// concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination.
AbstractProfileBreakdown<QueryTimingType> breakdown = contexts.values().iterator().next();
queryNodeTime = breakdown.toNodeTime() + createWeightTime;
maxSliceNodeTime = 0L;
minSliceNodeTime = 0L;
avgSliceNodeTime = 0L;
Map<String, Long> queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap());
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime);
queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L);
return queryBreakdownMap;
}

// first create the slice level breakdowns
Expand Down Expand Up @@ -191,10 +206,12 @@ Map<Collector, Map<String, Long>> buildSliceLevelBreakdown() {
}
// compute sliceMaxEndTime as max of sliceEndTime across all timing types
sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE));
sliceMinStartTime = Math.min(
sliceMinStartTime,
currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE)
);
long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE);
if (currentSliceStartTime == 0L) {
// The timer for the current timing type never starts, so we continue here
continue;
}
sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime);
// compute total time for each timing type at slice level using sliceEndTime and sliceStartTime
currentSliceBreakdown.put(
timingType.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import org.apache.lucene.search.Collector;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.Timer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -24,7 +22,6 @@
* @opensearch.internal
*/
public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree {
protected List<Timer> concurrentPathRewriteTimers = new ArrayList<>();

@Override
protected ContextualProfileBreakdown<QueryTimingType> createProfileBreakdown() {
Expand Down Expand Up @@ -91,11 +88,4 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map<
}
}
}

/**
* @return the concurrent path rewrite timer list for this profile tree
*/
public List<Timer> getConcurrentPathRewriteTimers() {
return concurrentPathRewriteTimers;
}
}
Loading

0 comments on commit 8c2ad21

Please sign in to comment.