From f5de511201cb2682293f8d787bbb5c00944ebe9a Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Wed, 4 Oct 2023 00:48:54 -0700 Subject: [PATCH] Fix timer race condition in profile rewrite and create weight for concurrent segment search (#10352) Signed-off-by: Ticheng Lin --- CHANGELOG.md | 1 + .../search/profile/query/QueryProfilerIT.java | 74 ++++++++++++++++++- .../search/internal/ContextIndexSearcher.java | 30 +++++++- .../search/profile/AbstractProfiler.java | 22 +++++- .../org/opensearch/search/profile/Timer.java | 12 +++ .../query/ConcurrentQueryProfileTree.java | 10 +++ .../search/profile/query/QueryProfiler.java | 58 ++++++++++++++- .../profile/query/QueryProfilerTests.java | 18 +++++ 8 files changed, 217 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ad18b94f31b7..e995a18478b5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) +- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java index 5f794d2abf878..24f2b1bbdabfc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java @@ -32,6 +32,8 @@ package org.opensearch.search.profile.query; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.tests.util.English; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; @@ -40,20 +42,23 @@ import org.opensearch.action.search.SearchType; import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.profile.ProfileResult; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.sort.SortOrder; -import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder; import static org.hamcrest.Matchers.emptyOrNullString; import static org.hamcrest.Matchers.equalTo; @@ -61,8 +66,32 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase { + private final boolean concurrentSearchEnabled; + private static final String MAX_PREFIX = "max_"; + private static final String MIN_PREFIX = "min_"; + private static final String AVG_PREFIX = "avg_"; + private static final String TIMING_TYPE_COUNT_SUFFIX = "_count"; + + public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) { + super(settings); + this.concurrentSearchEnabled = concurrentSearchEnabled; + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true } + ); + } -public class QueryProfilerIT extends OpenSearchIntegTestCase { + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } /** * This test simply checks to make sure nothing crashes. Test indexes 100-150 documents, @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception { assertEquals(result.getLuceneDescription(), "field1:one"); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -271,6 +301,7 @@ public void testBool() throws Exception { assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); assertEquals(result.getProfiledChildren().size(), 2); + assertQueryProfileResult(result); // Check the children List children = result.getProfiledChildren(); @@ -282,12 +313,14 @@ public void testBool() throws Exception { assertThat(childProfile.getTime(), greaterThan(0L)); assertNotNull(childProfile.getTimeBreakdown()); assertEquals(childProfile.getProfiledChildren().size(), 0); + assertQueryProfileResult(childProfile); childProfile = children.get(1); assertEquals(childProfile.getQueryName(), "TermQuery"); assertEquals(childProfile.getLuceneDescription(), "field1:two"); assertThat(childProfile.getTime(), greaterThan(0L)); assertNotNull(childProfile.getTimeBreakdown()); + assertQueryProfileResult(childProfile); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -415,6 +450,7 @@ public void testBoosting() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -455,6 +491,7 @@ public void testDisMaxRange() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -494,6 +531,7 @@ public void testRange() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -547,6 +585,7 @@ public void testPhrase() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -579,4 +618,35 @@ public void testNoProfile() throws Exception { assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0)); } + private void assertQueryProfileResult(ProfileResult result) { + Map breakdown = result.getTimeBreakdown(); + Long maxSliceTime = result.getMaxSliceTime(); + Long minSliceTime = result.getMinSliceTime(); + Long avgSliceTime = result.getAvgSliceTime(); + if (concurrentSearchEnabled) { + assertNotNull(maxSliceTime); + assertNotNull(minSliceTime); + assertNotNull(avgSliceTime); + assertThat(breakdown.size(), equalTo(66)); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + if (queryTimingType != QueryTimingType.CREATE_WEIGHT) { + String maxTimingType = MAX_PREFIX + queryTimingType; + String minTimingType = MIN_PREFIX + queryTimingType; + String avgTimingType = AVG_PREFIX + queryTimingType; + assertNotNull(breakdown.get(maxTimingType)); + assertNotNull(breakdown.get(minTimingType)); + assertNotNull(breakdown.get(avgTimingType)); + assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX)); + } + } + } else { + assertThat(maxSliceTime, is(nullValue())); + assertThat(minSliceTime, is(nullValue())); + assertThat(avgSliceTime, is(nullValue())); + assertThat(breakdown.size(), equalTo(27)); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index a520086de8051..afa28d04db9cf 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -71,6 +71,7 @@ import org.opensearch.search.dfs.AggregatedDfs; import org.opensearch.search.profile.ContextualProfileBreakdown; import org.opensearch.search.profile.Timer; +import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; import org.opensearch.search.profile.query.ProfileWeight; import org.opensearch.search.profile.query.QueryProfiler; import org.opensearch.search.profile.query.QueryTimingType; @@ -106,6 +107,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private QueryProfiler profiler; private MutableQueryTimeout cancellable; private SearchContext searchContext; + private boolean fromConcurrentPath = false; public ContextIndexSearcher( IndexReader reader, @@ -185,15 +187,26 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { @Override public Query rewrite(Query original) throws IOException { + Timer concurrentPathRewriteTimer = null; if (profiler != null) { - profiler.startRewriteTime(); + if (fromConcurrentPath) { + concurrentPathRewriteTimer = new Timer(); + profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer); + concurrentPathRewriteTimer.start(); + } else { + profiler.startRewriteTime(); + } } try { return super.rewrite(original); } finally { if (profiler != null) { - profiler.stopAndAddRewriteTime(); + if (fromConcurrentPath) { + concurrentPathRewriteTimer.stop(); + } else { + profiler.stopAndAddRewriteTime(); + } } } } @@ -204,7 +217,15 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws // createWeight() is called for each query in the tree, so we tell the queryProfiler // each invocation so that it can build an internal representation of the query // tree - ContextualProfileBreakdown profile = profiler.getQueryBreakdown(query); + ContextualProfileBreakdown profile; + if (searchContext.shouldUseConcurrentSearch()) { + long threadId = Thread.currentThread().getId(); + ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree() + .computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree()); + profile = profileTree.getProfileBreakdown(query); + } else { + profile = profiler.getQueryBreakdown(query); + } Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT); timer.start(); final Weight weight; @@ -267,6 +288,9 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { + if (searchContext.shouldUseConcurrentSearch()) { + fromConcurrentPath = true; + } // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. // This is actually beneficial for search queries to start search on latest segments first for time series workload. // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java index 4db1cb87a231d..9150c5ed212e5 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java @@ -32,7 +32,11 @@ package org.opensearch.search.profile; +import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; + +import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Base class for a profiler @@ -42,6 +46,7 @@ public class AbstractProfiler, E> { protected final AbstractInternalProfileTree profileTree; + protected Map threadToProfileTree; public AbstractProfiler(AbstractInternalProfileTree profileTree) { this.profileTree = profileTree; @@ -59,14 +64,27 @@ public PB getQueryBreakdown(E query) { * Removes the last (e.g. most recent) element on the stack. */ public void pollLastElement() { - profileTree.pollLast(); + if (threadToProfileTree == null) { + profileTree.pollLast(); + } else { + long threadId = Thread.currentThread().getId(); + ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId); + concurrentProfileTree.pollLast(); + } } /** * @return a hierarchical representation of the profiled tree */ public List getTree() { - return profileTree.getTree(); + if (threadToProfileTree == null) { + return profileTree.getTree(); + } + List profileResults = new ArrayList<>(); + for (Map.Entry profile : threadToProfileTree.entrySet()) { + profileResults.addAll(profile.getValue().getTree()); + } + return profileResults; } } diff --git a/server/src/main/java/org/opensearch/search/profile/Timer.java b/server/src/main/java/org/opensearch/search/profile/Timer.java index 172762cabeb6a..864c689cf7fa0 100644 --- a/server/src/main/java/org/opensearch/search/profile/Timer.java +++ b/server/src/main/java/org/opensearch/search/profile/Timer.java @@ -53,6 +53,18 @@ public class Timer { private boolean doTiming; private long timing, count, lastCount, start, earliestTimerStartTime; + public Timer() { + this(0, 0, 0, 0, 0); + } + + public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) { + this.timing = timing; + this.count = count; + this.lastCount = lastCount; + this.start = start; + this.earliestTimerStartTime = earliestTimerStartTime; + } + /** pkg-private for testing */ long nanoTime() { return System.nanoTime(); diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java index 4e54178c3b4fb..d6e5b79743b82 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java @@ -12,7 +12,9 @@ import org.apache.lucene.search.Collector; import org.opensearch.search.profile.ContextualProfileBreakdown; import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.Timer; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -22,6 +24,7 @@ * @opensearch.internal */ public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree { + protected List concurrentPathRewriteTimers = new ArrayList<>(); @Override protected ContextualProfileBreakdown createProfileBreakdown() { @@ -88,4 +91,11 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map< } } } + + /** + * @return the concurrent path rewrite timer list for this profile tree + */ + public List getConcurrentPathRewriteTimers() { + return concurrentPathRewriteTimers; + } } diff --git a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java index a80ce1c658081..af8852355bebe 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java @@ -35,8 +35,14 @@ import org.apache.lucene.search.Query; import org.opensearch.search.profile.AbstractProfiler; import org.opensearch.search.profile.ContextualProfileBreakdown; +import org.opensearch.search.profile.Timer; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; /** * This class acts as a thread-local storage for profiling a query. It also @@ -60,6 +66,11 @@ public final class QueryProfiler extends AbstractProfiler(); + threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); + } } /** Set the collector that is associated with this profiler. */ @@ -89,10 +100,55 @@ public long stopAndAddRewriteTime() { } /** + * The rewriting process is complex and hard to display because queries can undergo significant changes. + * Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case. + * In the concurrent search case, we compute the sum of the non-overlapping time across all slices and add + * the rewrite time from the non-concurrent path. * @return total time taken to rewrite all queries in this profile */ public long getRewriteTime() { - return ((AbstractQueryProfileTree) profileTree).getRewriteTime(); + long rewriteTime = ((AbstractQueryProfileTree) profileTree).getRewriteTime(); + if (profileTree instanceof ConcurrentQueryProfileTree) { + List timers = getConcurrentPathRewriteTimers(); + LinkedList mergedIntervals = mergeRewriteTimeIntervals(timers); + for (long[] interval : mergedIntervals) { + rewriteTime += interval[1] - interval[0]; + } + } + return rewriteTime; + } + + // package private for unit testing + LinkedList mergeRewriteTimeIntervals(List timers) { + LinkedList mergedIntervals = new LinkedList<>(); + timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); + for (Timer timer : timers) { + long startTime = timer.getEarliestTimerStartTime(); + long endTime = timer.getEarliestTimerStartTime() + timer.getApproximateTiming(); + if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < timer.getEarliestTimerStartTime()) { + long[] interval = new long[2]; + interval[0] = startTime; + interval[1] = endTime; + mergedIntervals.add(interval); + } else { + mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); + } + } + return mergedIntervals; + } + + /** + * @return a list of concurrent path rewrite timers for this concurrent search + */ + public List getConcurrentPathRewriteTimers() { + return ((ConcurrentQueryProfileTree) profileTree).getConcurrentPathRewriteTimers(); + } + + /** + * @return the thread to profile tree map for this concurrent search + */ + public Map getThreadToProfileTree() { + return threadToProfileTree; } /** diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index 64a440b85eb10..23bb99b8282b3 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -65,6 +65,7 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.Timer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -73,6 +74,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -482,4 +484,20 @@ public boolean shouldCache(Query query) throws IOException { } }; + + public void testMergeRewriteTimeIntervals() { + QueryProfiler profiler = new QueryProfiler(executor != null); + List timers = new LinkedList<>(); + timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); + LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); + assertThat(mergedIntervals.size(), equalTo(2)); + long[] interval = mergedIntervals.get(0); + assertThat(interval[0], equalTo(553074509287335L)); + assertThat(interval[1], equalTo(553074509516290L)); + interval = mergedIntervals.get(1); + assertThat(interval[0], equalTo(553074511206907L)); + assertThat(interval[1], equalTo(553074511424041L)); + } }