From ad5028a73bd4c98d6a04bc4fb27c1b2fb6e85420 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Wed, 4 Oct 2023 00:48:54 -0700 Subject: [PATCH] Fix rewrite time issue in concurrent path Signed-off-by: Ticheng Lin --- .../search/internal/ContextIndexSearcher.java | 24 +++++- .../profile/ContextualProfileBreakdown.java | 4 + .../ConcurrentQueryProfileBreakdown.java | 79 ++++++++++++++++++- .../query/ConcurrentQueryTimingType.java | 38 +++++++++ .../search/profile/query/ProfileWeight.java | 5 ++ 5 files changed, 144 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryTimingType.java diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index aa86ed4e56801..2745bf5e6f515 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -86,6 +86,8 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; /** @@ -106,6 +108,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private QueryProfiler profiler; private MutableQueryTimeout cancellable; private SearchContext searchContext; + private boolean fromConcurrentPath = false; + private Map threadToRewriteTimer = new ConcurrentHashMap<>(); public ContextIndexSearcher( IndexReader reader, @@ -185,15 +189,26 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { @Override public Query rewrite(Query original) throws IOException { + Timer concurrentPathRewriteTimer = null; + long threadId = Thread.currentThread().getId(); if (profiler != null) { - profiler.startRewriteTime(); + if (fromConcurrentPath) { + concurrentPathRewriteTimer = threadToRewriteTimer.computeIfAbsent(threadId, k -> new Timer()); + concurrentPathRewriteTimer.start(); + } else { + profiler.startRewriteTime(); + } } try { return super.rewrite(original); } finally { if (profiler != null) { - profiler.stopAndAddRewriteTime(); + if (fromConcurrentPath) { + concurrentPathRewriteTimer.stop(); + } else { + profiler.stopAndAddRewriteTime(); + } } } } @@ -271,6 +286,11 @@ protected void search(List leaves, Weight weight, Collector c // This is actually beneficial for search queries to start search on latest segments first for time series workload. // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf // reader order here. + if (searchContext.shouldUseConcurrentSearch()) { + fromConcurrentPath = true; + threadToRewriteTimer = ((ProfileWeight) weight).getThreadToRewriteTimer(); + } + if (searchContext.shouldUseTimeSeriesDescSortOptimization()) { for (int i = leaves.size() - 1; i >= 0; i--) { searchLeaf(leaves.get(i), weight, collector); diff --git a/server/src/main/java/org/opensearch/search/profile/ContextualProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/ContextualProfileBreakdown.java index 3fe621321c8ad..b16e5818685c2 100644 --- a/server/src/main/java/org/opensearch/search/profile/ContextualProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/ContextualProfileBreakdown.java @@ -35,4 +35,8 @@ public ContextualProfileBreakdown(Class clazz) { public void associateCollectorToLeaves(Collector collector, LeafReaderContext leaf) {} public void associateCollectorsToLeaves(Map> collectorToLeaves) {} + + public Map getThreadToRewriteTimer() { + return null; + } } diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java index e567fdd2d436c..695d6ff5f28bb 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java @@ -13,6 +13,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.search.profile.AbstractProfileBreakdown; import org.opensearch.search.profile.ContextualProfileBreakdown; +import org.opensearch.search.profile.Timer; import java.util.ArrayList; import java.util.Collections; @@ -42,6 +43,8 @@ public final class ConcurrentQueryProfileBreakdown extends ContextualProfileBrea // keep track of all breakdown timings per segment. package-private for testing private final Map> contexts = new ConcurrentHashMap<>(); + private final Map threadToRewriteTimer = new ConcurrentHashMap<>(); + // represents slice to leaves mapping as for each slice a unique collector instance is created private final Map> sliceCollectorsToLeaves = new ConcurrentHashMap<>(); @@ -50,6 +53,10 @@ public ConcurrentQueryProfileBreakdown() { super(QueryTimingType.class); } + public Map getThreadToRewriteTimer() { + return threadToRewriteTimer; + } + @Override public AbstractProfileBreakdown context(Object context) { // See please https://bugs.openjdk.java.net/browse/JDK-8161372 @@ -93,11 +100,11 @@ public Map toBreakdownMap() { */ private Map buildDefaultQueryBreakdownMap(long createWeightTime) { final Map concurrentQueryBreakdownMap = new HashMap<>(); - for (QueryTimingType timingType : QueryTimingType.values()) { + for (ConcurrentQueryTimingType timingType : ConcurrentQueryTimingType.values()) { final String timingTypeKey = timingType.toString(); final String timingTypeCountKey = timingTypeKey + TIMING_TYPE_COUNT_SUFFIX; - if (timingType.equals(QueryTimingType.CREATE_WEIGHT)) { + if (timingType.equals(ConcurrentQueryTimingType.CREATE_WEIGHT)) { concurrentQueryBreakdownMap.put(timingTypeKey, createWeightTime); concurrentQueryBreakdownMap.put(timingTypeCountKey, 1L); continue; @@ -248,7 +255,7 @@ public Map buildQueryBreakdownMap( ) { final Map queryBreakdownMap = new HashMap<>(); long queryEndTime = Long.MIN_VALUE; - for (QueryTimingType queryTimingType : QueryTimingType.values()) { + for (ConcurrentQueryTimingType queryTimingType : ConcurrentQueryTimingType.values()) { final String timingTypeKey = queryTimingType.toString(); final String timingTypeCountKey = timingTypeKey + TIMING_TYPE_COUNT_SUFFIX; final String sliceEndTimeForTimingType = timingTypeKey + SLICE_END_TIME_SUFFIX; @@ -266,12 +273,76 @@ public Map buildQueryBreakdownMap( long queryTimingTypeCount = 0L; // the create weight time is computed at the query level and is called only once per query - if (queryTimingType == QueryTimingType.CREATE_WEIGHT) { + if (queryTimingType == ConcurrentQueryTimingType.CREATE_WEIGHT) { queryBreakdownMap.put(timingTypeCountKey, 1L); queryBreakdownMap.put(timingTypeKey, createWeightTime); continue; } + if (queryTimingType == ConcurrentQueryTimingType.REWRITE) { + if (threadToRewriteTimer.isEmpty()) { + // add time related stats + queryBreakdownMap.put(timingTypeKey, 0L); + queryBreakdownMap.put(maxBreakdownTypeTime, 0L); + queryBreakdownMap.put(minBreakdownTypeTime, 0L); + queryBreakdownMap.put(avgBreakdownTypeTime, 0L); + // add count related stats + queryBreakdownMap.put(timingTypeCountKey, 0L); + queryBreakdownMap.put(maxBreakdownTypeCount, 0L); + queryBreakdownMap.put(minBreakdownTypeCount, 0L); + queryBreakdownMap.put(avgBreakdownTypeCount, 0L); + continue; + } + for (Map.Entry rewrite : threadToRewriteTimer.entrySet()) { + long sliceRewriteTime = rewrite.getValue().getApproximateTiming(); + long sliceRewriteCount = rewrite.getValue().getCount(); + long sliceRewriteStartTime = rewrite.getValue().getEarliestTimerStartTime(); + // compute max/min/avg rewrite time across slices + queryBreakdownMap.compute( + maxBreakdownTypeTime, + (key, value) -> (value == null) ? sliceRewriteTime : Math.max(sliceRewriteTime, value) + ); + queryBreakdownMap.compute( + minBreakdownTypeTime, + (key, value) -> (value == null) ? sliceRewriteTime : Math.min(sliceRewriteTime, value) + ); + queryBreakdownMap.compute( + avgBreakdownTypeTime, + (key, value) -> (value == null) ? sliceRewriteTime : sliceRewriteTime + value + ); + + // compute max/min/avg rewrite count across slices + queryBreakdownMap.compute( + maxBreakdownTypeCount, + (key, value) -> (value == null) ? sliceRewriteCount : Math.max(sliceRewriteCount, value) + ); + queryBreakdownMap.compute( + minBreakdownTypeCount, + (key, value) -> (value == null) ? sliceRewriteCount : Math.min(sliceRewriteCount, value) + ); + queryBreakdownMap.compute( + avgBreakdownTypeCount, + (key, value) -> (value == null) ? sliceRewriteCount : sliceRewriteCount + value + ); + + // query start/end time for rewrite is min/max of start/end time across slices for that TimingType + queryTimingTypeEndTime = Math.max( + queryTimingTypeEndTime, + sliceRewriteStartTime + sliceRewriteTime + ); + queryTimingTypeStartTime = Math.min( + queryTimingTypeStartTime, + sliceRewriteStartTime + ); + queryTimingTypeCount += sliceRewriteCount; + } + queryBreakdownMap.put(timingTypeKey, queryTimingTypeEndTime - queryTimingTypeStartTime); + queryBreakdownMap.put(timingTypeCountKey, queryTimingTypeCount); + queryBreakdownMap.compute(avgBreakdownTypeTime, (key, value) -> (value == null) ? 0L : value / threadToRewriteTimer.size()); + queryBreakdownMap.compute(avgBreakdownTypeCount, (key, value) -> (value == null) ? 0L : value / threadToRewriteTimer.size()); + continue; + } + // for all other timing types, we will compute min/max/avg/total across slices for (Map.Entry> sliceBreakdown : sliceLevelBreakdowns.entrySet()) { long sliceBreakdownTypeTime = sliceBreakdown.getValue().getOrDefault(timingTypeKey, 0L); diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryTimingType.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryTimingType.java new file mode 100644 index 0000000000000..7a08003a21b3f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryTimingType.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.search.profile.query; + +import java.util.Locale; + +/** + * Different profile levels of the query with concurrent execution + * + * @opensearch.internal + */ +public enum ConcurrentQueryTimingType { + REWRITE, + CREATE_WEIGHT, + BUILD_SCORER, + NEXT_DOC, + ADVANCE, + MATCH, + SCORE, + SHALLOW_ADVANCE, + COMPUTE_MAX_SCORE, + SET_MIN_COMPETITIVE_SCORE; + + public String toString() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/server/src/main/java/org/opensearch/search/profile/query/ProfileWeight.java b/server/src/main/java/org/opensearch/search/profile/query/ProfileWeight.java index c7e70d8d88007..21477f396c67b 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ProfileWeight.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ProfileWeight.java @@ -44,6 +44,7 @@ import org.opensearch.search.profile.Timer; import java.io.IOException; +import java.util.Map; /** * Weight wrapper that will compute how much time it takes to build the @@ -141,4 +142,8 @@ public boolean isCacheable(LeafReaderContext ctx) { public void associateCollectorToLeaves(LeafReaderContext leaf, Collector collector) { profile.associateCollectorToLeaves(collector, leaf); } + + public Map getThreadToRewriteTimer() { + return profile.getThreadToRewriteTimer(); + } }