Skip to content

Commit

Permalink
Fix rewrite time issue in concurrent path
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Oct 4, 2023
1 parent d5a95b8 commit ad5028a
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

/**
Expand All @@ -106,6 +108,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private QueryProfiler profiler;
private MutableQueryTimeout cancellable;
private SearchContext searchContext;
private boolean fromConcurrentPath = false;
private Map<Long, Timer> threadToRewriteTimer = new ConcurrentHashMap<>();

public ContextIndexSearcher(
IndexReader reader,
Expand Down Expand Up @@ -185,15 +189,26 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {

@Override
public Query rewrite(Query original) throws IOException {
Timer concurrentPathRewriteTimer = null;
long threadId = Thread.currentThread().getId();
if (profiler != null) {
profiler.startRewriteTime();
if (fromConcurrentPath) {
concurrentPathRewriteTimer = threadToRewriteTimer.computeIfAbsent(threadId, k -> new Timer());
concurrentPathRewriteTimer.start();
} else {
profiler.startRewriteTime();
}
}

try {
return super.rewrite(original);
} finally {
if (profiler != null) {
profiler.stopAndAddRewriteTime();
if (fromConcurrentPath) {
concurrentPathRewriteTimer.stop();
} else {
profiler.stopAndAddRewriteTime();
}
}
}
}
Expand Down Expand Up @@ -271,6 +286,11 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseConcurrentSearch()) {
fromConcurrentPath = true;
threadToRewriteTimer = ((ProfileWeight) weight).getThreadToRewriteTimer();
}

if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public ContextualProfileBreakdown(Class<T> clazz) {
public void associateCollectorToLeaves(Collector collector, LeafReaderContext leaf) {}

public void associateCollectorsToLeaves(Map<Collector, List<LeafReaderContext>> collectorToLeaves) {}

public Map<Long, Timer> getThreadToRewriteTimer() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.search.profile.AbstractProfileBreakdown;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -42,6 +43,8 @@ public final class ConcurrentQueryProfileBreakdown extends ContextualProfileBrea
// keep track of all breakdown timings per segment. package-private for testing
private final Map<Object, AbstractProfileBreakdown<QueryTimingType>> contexts = new ConcurrentHashMap<>();

private final Map<Long, Timer> threadToRewriteTimer = new ConcurrentHashMap<>();

// represents slice to leaves mapping as for each slice a unique collector instance is created
private final Map<Collector, List<LeafReaderContext>> sliceCollectorsToLeaves = new ConcurrentHashMap<>();

Expand All @@ -50,6 +53,10 @@ public ConcurrentQueryProfileBreakdown() {
super(QueryTimingType.class);
}

public Map<Long, Timer> getThreadToRewriteTimer() {
return threadToRewriteTimer;
}

@Override
public AbstractProfileBreakdown<QueryTimingType> context(Object context) {
// See please https://bugs.openjdk.java.net/browse/JDK-8161372
Expand Down Expand Up @@ -93,11 +100,11 @@ public Map<String, Long> toBreakdownMap() {
*/
private Map<String, Long> buildDefaultQueryBreakdownMap(long createWeightTime) {
final Map<String, Long> concurrentQueryBreakdownMap = new HashMap<>();
for (QueryTimingType timingType : QueryTimingType.values()) {
for (ConcurrentQueryTimingType timingType : ConcurrentQueryTimingType.values()) {
final String timingTypeKey = timingType.toString();
final String timingTypeCountKey = timingTypeKey + TIMING_TYPE_COUNT_SUFFIX;

if (timingType.equals(QueryTimingType.CREATE_WEIGHT)) {
if (timingType.equals(ConcurrentQueryTimingType.CREATE_WEIGHT)) {
concurrentQueryBreakdownMap.put(timingTypeKey, createWeightTime);
concurrentQueryBreakdownMap.put(timingTypeCountKey, 1L);
continue;
Expand Down Expand Up @@ -248,7 +255,7 @@ public Map<String, Long> buildQueryBreakdownMap(
) {
final Map<String, Long> queryBreakdownMap = new HashMap<>();
long queryEndTime = Long.MIN_VALUE;
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
for (ConcurrentQueryTimingType queryTimingType : ConcurrentQueryTimingType.values()) {
final String timingTypeKey = queryTimingType.toString();
final String timingTypeCountKey = timingTypeKey + TIMING_TYPE_COUNT_SUFFIX;
final String sliceEndTimeForTimingType = timingTypeKey + SLICE_END_TIME_SUFFIX;
Expand All @@ -266,12 +273,76 @@ public Map<String, Long> buildQueryBreakdownMap(
long queryTimingTypeCount = 0L;

// the create weight time is computed at the query level and is called only once per query
if (queryTimingType == QueryTimingType.CREATE_WEIGHT) {
if (queryTimingType == ConcurrentQueryTimingType.CREATE_WEIGHT) {
queryBreakdownMap.put(timingTypeCountKey, 1L);
queryBreakdownMap.put(timingTypeKey, createWeightTime);
continue;
}

if (queryTimingType == ConcurrentQueryTimingType.REWRITE) {
if (threadToRewriteTimer.isEmpty()) {
// add time related stats
queryBreakdownMap.put(timingTypeKey, 0L);
queryBreakdownMap.put(maxBreakdownTypeTime, 0L);
queryBreakdownMap.put(minBreakdownTypeTime, 0L);
queryBreakdownMap.put(avgBreakdownTypeTime, 0L);
// add count related stats
queryBreakdownMap.put(timingTypeCountKey, 0L);
queryBreakdownMap.put(maxBreakdownTypeCount, 0L);
queryBreakdownMap.put(minBreakdownTypeCount, 0L);
queryBreakdownMap.put(avgBreakdownTypeCount, 0L);
continue;
}
for (Map.Entry<Long, Timer> rewrite : threadToRewriteTimer.entrySet()) {
long sliceRewriteTime = rewrite.getValue().getApproximateTiming();
long sliceRewriteCount = rewrite.getValue().getCount();
long sliceRewriteStartTime = rewrite.getValue().getEarliestTimerStartTime();
// compute max/min/avg rewrite time across slices
queryBreakdownMap.compute(
maxBreakdownTypeTime,
(key, value) -> (value == null) ? sliceRewriteTime : Math.max(sliceRewriteTime, value)
);
queryBreakdownMap.compute(
minBreakdownTypeTime,
(key, value) -> (value == null) ? sliceRewriteTime : Math.min(sliceRewriteTime, value)
);
queryBreakdownMap.compute(
avgBreakdownTypeTime,
(key, value) -> (value == null) ? sliceRewriteTime : sliceRewriteTime + value
);

// compute max/min/avg rewrite count across slices
queryBreakdownMap.compute(
maxBreakdownTypeCount,
(key, value) -> (value == null) ? sliceRewriteCount : Math.max(sliceRewriteCount, value)
);
queryBreakdownMap.compute(
minBreakdownTypeCount,
(key, value) -> (value == null) ? sliceRewriteCount : Math.min(sliceRewriteCount, value)
);
queryBreakdownMap.compute(
avgBreakdownTypeCount,
(key, value) -> (value == null) ? sliceRewriteCount : sliceRewriteCount + value
);

// query start/end time for rewrite is min/max of start/end time across slices for that TimingType
queryTimingTypeEndTime = Math.max(
queryTimingTypeEndTime,
sliceRewriteStartTime + sliceRewriteTime
);
queryTimingTypeStartTime = Math.min(
queryTimingTypeStartTime,
sliceRewriteStartTime
);
queryTimingTypeCount += sliceRewriteCount;
}
queryBreakdownMap.put(timingTypeKey, queryTimingTypeEndTime - queryTimingTypeStartTime);
queryBreakdownMap.put(timingTypeCountKey, queryTimingTypeCount);
queryBreakdownMap.compute(avgBreakdownTypeTime, (key, value) -> (value == null) ? 0L : value / threadToRewriteTimer.size());
queryBreakdownMap.compute(avgBreakdownTypeCount, (key, value) -> (value == null) ? 0L : value / threadToRewriteTimer.size());
continue;
}

// for all other timing types, we will compute min/max/avg/total across slices
for (Map.Entry<Collector, Map<String, Long>> sliceBreakdown : sliceLevelBreakdowns.entrySet()) {
long sliceBreakdownTypeTime = sliceBreakdown.getValue().getOrDefault(timingTypeKey, 0L);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.search.profile.query;

import java.util.Locale;

/**
* Different profile levels of the query with concurrent execution
*
* @opensearch.internal
*/
public enum ConcurrentQueryTimingType {
REWRITE,
CREATE_WEIGHT,
BUILD_SCORER,
NEXT_DOC,
ADVANCE,
MATCH,
SCORE,
SHALLOW_ADVANCE,
COMPUTE_MAX_SCORE,
SET_MIN_COMPETITIVE_SCORE;

public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.profile.Timer;

import java.io.IOException;
import java.util.Map;

/**
* Weight wrapper that will compute how much time it takes to build the
Expand Down Expand Up @@ -141,4 +142,8 @@ public boolean isCacheable(LeafReaderContext ctx) {
public void associateCollectorToLeaves(LeafReaderContext leaf, Collector collector) {
profile.associateCollectorToLeaves(collector, leaf);
}

public Map<Long, Timer> getThreadToRewriteTimer() {
return profile.getThreadToRewriteTimer();
}
}

0 comments on commit ad5028a

Please sign in to comment.