From 200ad5d28a577877be530ecab507601898025c5c Mon Sep 17 00:00:00 2001 From: Ticheng Lin <51488860+ticheng-aws@users.noreply.github.com> Date: Thu, 19 Oct 2023 19:58:13 -0700 Subject: [PATCH] Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight (#10352) * Fix timer race condition in profile rewrite and create weight for concurrent segment search (#10352) Signed-off-by: Ticheng Lin * Refactor and work on the PR comments (#10352) Signed-off-by: Ticheng Lin --------- Signed-off-by: Ticheng Lin --- CHANGELOG.md | 1 + .../search/profile/query/QueryProfilerIT.java | 157 +++++++++++++++++- .../opensearch/search/profile/Profilers.java | 7 +- .../org/opensearch/search/profile/Timer.java | 12 ++ .../query/AbstractQueryProfileTree.java | 5 +- .../ConcurrentQueryProfileBreakdown.java | 27 ++- .../query/ConcurrentQueryProfiler.java | 134 +++++++++++++++ .../search/profile/query/QueryProfiler.java | 14 +- .../ConcurrentQueryProfileBreakdownTests.java | 52 ++++++ .../query/ConcurrentQueryProfilerTests.java | 36 ++++ .../profile/query/QueryProfilerTests.java | 16 +- 11 files changed, 438 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java create mode 100644 server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 552c277789dd7..5c52c43a35b8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) - Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) +- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java index 5f794d2abf878..ef73438114079 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java @@ -32,6 +32,8 @@ package org.opensearch.search.profile.query; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.tests.util.English; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; @@ -40,20 +42,23 @@ import org.opensearch.action.search.SearchType; import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.profile.ProfileResult; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.sort.SortOrder; -import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder; import static org.hamcrest.Matchers.emptyOrNullString; import static org.hamcrest.Matchers.equalTo; @@ -61,8 +66,32 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase { + private final boolean concurrentSearchEnabled; + private static final String MAX_PREFIX = "max_"; + private static final String MIN_PREFIX = "min_"; + private static final String AVG_PREFIX = "avg_"; + private static final String TIMING_TYPE_COUNT_SUFFIX = "_count"; + + public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) { + super(settings); + this.concurrentSearchEnabled = concurrentSearchEnabled; + } -public class QueryProfilerIT extends OpenSearchIntegTestCase { + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } /** * This test simply checks to make sure nothing crashes. Test indexes 100-150 documents, @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception { assertEquals(result.getLuceneDescription(), "field1:one"); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -271,6 +301,7 @@ public void testBool() throws Exception { assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); assertEquals(result.getProfiledChildren().size(), 2); + assertQueryProfileResult(result); // Check the children List children = result.getProfiledChildren(); @@ -282,12 +313,14 @@ public void testBool() throws Exception { assertThat(childProfile.getTime(), greaterThan(0L)); assertNotNull(childProfile.getTimeBreakdown()); assertEquals(childProfile.getProfiledChildren().size(), 0); + assertQueryProfileResult(childProfile); childProfile = children.get(1); assertEquals(childProfile.getQueryName(), "TermQuery"); assertEquals(childProfile.getLuceneDescription(), "field1:two"); assertThat(childProfile.getTime(), greaterThan(0L)); assertNotNull(childProfile.getTimeBreakdown()); + assertQueryProfileResult(childProfile); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -415,6 +450,90 @@ public void testBoosting() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); + } + + CollectorResult result = searchProfiles.getCollectorResult(); + assertThat(result.getName(), is(not(emptyOrNullString()))); + assertThat(result.getTime(), greaterThan(0L)); + } + } + } + + public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception { + createIndex("test"); + ensureGreen(); + + int numDocs = 122; + IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; i++) { + docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i); + } + + List terms = Arrays.asList("zero", "zero", "one"); + + indexRandom(true, docs); + + refresh(); + + QueryBuilder q = QueryBuilders.boostingQuery( + QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())), + QueryBuilders.termsQuery("field1", terms) + ).boost(randomFloat()).negativeBoost(randomFloat()); + logger.info("Query: {}", q); + + SearchResponse resp = client().prepareSearch() + .setQuery(q) + .setTrackTotalHits(true) + .setProfile(true) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .get(); + + assertNotNull("Profile response element should not be null", resp.getProfileResults()); + assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0)); + + for (Map.Entry shardResult : resp.getProfileResults().entrySet()) { + assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L)); + assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L)); + for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) { + List results = searchProfiles.getQueryResults(); + for (ProfileResult result : results) { + assertNotNull(result.getQueryName()); + assertNotNull(result.getLuceneDescription()); + assertThat(result.getTime(), greaterThan(0L)); + Map breakdown = result.getTimeBreakdown(); + Long maxSliceTime = result.getMaxSliceTime(); + Long minSliceTime = result.getMinSliceTime(); + Long avgSliceTime = result.getAvgSliceTime(); + if (concurrentSearchEnabled && results.get(0).equals(result)) { + assertNotNull(maxSliceTime); + assertNotNull(minSliceTime); + assertNotNull(avgSliceTime); + assertThat(breakdown.size(), equalTo(66)); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + if (queryTimingType != QueryTimingType.CREATE_WEIGHT) { + String maxTimingType = MAX_PREFIX + queryTimingType; + String minTimingType = MIN_PREFIX + queryTimingType; + String avgTimingType = AVG_PREFIX + queryTimingType; + assertNotNull(breakdown.get(maxTimingType)); + assertNotNull(breakdown.get(minTimingType)); + assertNotNull(breakdown.get(avgTimingType)); + assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX)); + } + } + } else if (concurrentSearchEnabled) { + assertThat(maxSliceTime, equalTo(0L)); + assertThat(minSliceTime, equalTo(0L)); + assertThat(avgSliceTime, equalTo(0L)); + assertThat(breakdown.size(), equalTo(27)); + } else { + assertThat(maxSliceTime, is(nullValue())); + assertThat(minSliceTime, is(nullValue())); + assertThat(avgSliceTime, is(nullValue())); + assertThat(breakdown.size(), equalTo(27)); + } } CollectorResult result = searchProfiles.getCollectorResult(); @@ -455,6 +574,7 @@ public void testDisMaxRange() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -494,6 +614,7 @@ public void testRange() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -547,6 +668,7 @@ public void testPhrase() throws Exception { assertNotNull(result.getLuceneDescription()); assertThat(result.getTime(), greaterThan(0L)); assertNotNull(result.getTimeBreakdown()); + assertQueryProfileResult(result); } CollectorResult result = searchProfiles.getCollectorResult(); @@ -579,4 +701,35 @@ public void testNoProfile() throws Exception { assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0)); } + private void assertQueryProfileResult(ProfileResult result) { + Map breakdown = result.getTimeBreakdown(); + Long maxSliceTime = result.getMaxSliceTime(); + Long minSliceTime = result.getMinSliceTime(); + Long avgSliceTime = result.getAvgSliceTime(); + if (concurrentSearchEnabled) { + assertNotNull(maxSliceTime); + assertNotNull(minSliceTime); + assertNotNull(avgSliceTime); + assertThat(breakdown.size(), equalTo(66)); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + if (queryTimingType != QueryTimingType.CREATE_WEIGHT) { + String maxTimingType = MAX_PREFIX + queryTimingType; + String minTimingType = MIN_PREFIX + queryTimingType; + String avgTimingType = AVG_PREFIX + queryTimingType; + assertNotNull(breakdown.get(maxTimingType)); + assertNotNull(breakdown.get(minTimingType)); + assertNotNull(breakdown.get(avgTimingType)); + assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX)); + } + } + } else { + assertThat(maxSliceTime, is(nullValue())); + assertThat(minSliceTime, is(nullValue())); + assertThat(avgSliceTime, is(nullValue())); + assertThat(breakdown.size(), equalTo(27)); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 8e87c7ff4acd4..68cf05c988b5b 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -35,6 +35,9 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.profile.aggregation.AggregationProfiler; import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler; +import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; +import org.opensearch.search.profile.query.ConcurrentQueryProfiler; +import org.opensearch.search.profile.query.InternalQueryProfileTree; import org.opensearch.search.profile.query.QueryProfiler; import java.util.ArrayList; @@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc /** Switch to a new profile. */ public QueryProfiler addQueryProfiler() { - QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled); + QueryProfiler profiler = isConcurrentSegmentSearchEnabled + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); queryProfilers.add(profiler); return profiler; diff --git a/server/src/main/java/org/opensearch/search/profile/Timer.java b/server/src/main/java/org/opensearch/search/profile/Timer.java index 172762cabeb6a..864c689cf7fa0 100644 --- a/server/src/main/java/org/opensearch/search/profile/Timer.java +++ b/server/src/main/java/org/opensearch/search/profile/Timer.java @@ -53,6 +53,18 @@ public class Timer { private boolean doTiming; private long timing, count, lastCount, start, earliestTimerStartTime; + public Timer() { + this(0, 0, 0, 0, 0); + } + + public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) { + this.timing = timing; + this.count = count; + this.lastCount = lastCount; + this.start = start; + this.earliestTimerStartTime = earliestTimerStartTime; + } + /** pkg-private for testing */ long nanoTime() { return System.nanoTime(); diff --git a/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java b/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java index 8e825def13f5d..2f5d632ee2d87 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/query/AbstractQueryProfileTree.java @@ -54,14 +54,11 @@ public void startRewriteTime() { * startRewriteTime() must be called for a particular context prior to calling * stopAndAddRewriteTime(), otherwise the elapsed time will be negative and * nonsensical - * - * @return The elapsed time */ - public long stopAndAddRewriteTime() { + public void stopAndAddRewriteTime() { long time = Math.max(1, System.nanoTime() - rewriteScratch); rewriteTime += time; rewriteScratch = 0; - return time; } public long getRewriteTime() { diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java index e567fdd2d436c..59ef01f9f947a 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java @@ -70,7 +70,7 @@ public Map toBreakdownMap() { ); final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString()); - if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) { + if (contexts.isEmpty()) { // If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the // create_weight time/count queryNodeTime = createWeightTime; @@ -78,6 +78,21 @@ public Map toBreakdownMap() { minSliceNodeTime = 0L; avgSliceNodeTime = 0L; return buildDefaultQueryBreakdownMap(createWeightTime); + } else if (sliceCollectorsToLeaves.isEmpty()) { + // This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It + // creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for + // the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later + // in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no + // concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination. + AbstractProfileBreakdown breakdown = contexts.values().iterator().next(); + queryNodeTime = breakdown.toNodeTime() + createWeightTime; + maxSliceNodeTime = 0L; + minSliceNodeTime = 0L; + avgSliceNodeTime = 0L; + Map queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap()); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L); + return queryBreakdownMap; } // first create the slice level breakdowns @@ -191,10 +206,12 @@ Map> buildSliceLevelBreakdown() { } // compute sliceMaxEndTime as max of sliceEndTime across all timing types sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE)); - sliceMinStartTime = Math.min( - sliceMinStartTime, - currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE) - ); + long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE); + if (currentSliceStartTime == 0L) { + // The timer for the current timing type never starts, so we continue here + continue; + } + sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime); // compute total time for each timing type at slice level using sliceEndTime and sliceStartTime currentSliceBreakdown.put( timingType.toString(), diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java new file mode 100644 index 0000000000000..42bf23bb13fbe --- /dev/null +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.apache.lucene.search.Query; +import org.opensearch.search.profile.ContextualProfileBreakdown; +import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.Timer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class acts as a thread-local storage for profiling a query with concurrent execution + * + * @opensearch.internal + */ +public final class ConcurrentQueryProfiler extends QueryProfiler { + + private final Map threadToProfileTree; + // The LinkedList does not need to be thread safe, as the map associates thread IDs with LinkedList, and only + // one thread will access the LinkedList at a time. + private final Map> threadToRewriteTimers; + + public ConcurrentQueryProfiler(AbstractQueryProfileTree profileTree) { + super(profileTree); + long threadId = getCurrentThreadId(); + // We utilize LinkedHashMap to preserve the insertion order of the profiled queries + threadToProfileTree = Collections.synchronizedMap(new LinkedHashMap<>()); + threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); + threadToRewriteTimers = new ConcurrentHashMap<>(); + threadToRewriteTimers.put(threadId, new LinkedList<>()); + } + + @Override + public ContextualProfileBreakdown getQueryBreakdown(Query query) { + ConcurrentQueryProfileTree profileTree = threadToProfileTree.computeIfAbsent( + getCurrentThreadId(), + k -> new ConcurrentQueryProfileTree() + ); + return profileTree.getProfileBreakdown(query); + } + + /** + * Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack. + */ + @Override + public void pollLastElement() { + ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(getCurrentThreadId()); + if (concurrentProfileTree != null) { + concurrentProfileTree.pollLast(); + } + } + + /** + * @return a hierarchical representation of the profiled tree + */ + @Override + public List getTree() { + List profileResults = new ArrayList<>(); + for (Map.Entry profile : threadToProfileTree.entrySet()) { + profileResults.addAll(profile.getValue().getTree()); + } + return profileResults; + } + + /** + * Begin timing the rewrite phase of a request + */ + @Override + public void startRewriteTime() { + Timer rewriteTimer = new Timer(); + threadToRewriteTimers.computeIfAbsent(getCurrentThreadId(), k -> new LinkedList<>()).add(rewriteTimer); + rewriteTimer.start(); + } + + /** + * Stop recording the current rewrite timer + */ + public void stopAndAddRewriteTime() { + Timer rewriteTimer = threadToRewriteTimers.get(getCurrentThreadId()).getLast(); + rewriteTimer.stop(); + } + + /** + * @return total time taken to rewrite all queries in this concurrent query profiler + */ + @Override + public long getRewriteTime() { + long totalRewriteTime = 0L; + List rewriteTimers = new LinkedList<>(); + threadToRewriteTimers.values().forEach(rewriteTimers::addAll); + LinkedList mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers); + for (long[] interval : mergedIntervals) { + totalRewriteTime += interval[1] - interval[0]; + } + return totalRewriteTime; + } + + // package private for unit testing + LinkedList mergeRewriteTimeIntervals(List timers) { + LinkedList mergedIntervals = new LinkedList<>(); + timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); + for (Timer timer : timers) { + long startTime = timer.getEarliestTimerStartTime(); + long endTime = startTime + timer.getApproximateTiming(); + if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < startTime) { + long[] interval = new long[2]; + interval[0] = startTime; + interval[1] = endTime; + mergedIntervals.add(interval); + } else { + mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); + } + } + return mergedIntervals; + } + + private long getCurrentThreadId() { + return Thread.currentThread().getId(); + } +} diff --git a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java index a80ce1c658081..332c4b3551450 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java @@ -51,15 +51,15 @@ * * @opensearch.internal */ -public final class QueryProfiler extends AbstractProfiler, Query> { +public class QueryProfiler extends AbstractProfiler, Query> { /** * The root Collector used in the search */ private InternalProfileComponent collector; - public QueryProfiler(boolean concurrent) { - super(concurrent ? new ConcurrentQueryProfileTree() : new InternalQueryProfileTree()); + public QueryProfiler(AbstractQueryProfileTree profileTree) { + super(profileTree); } /** Set the collector that is associated with this profiler. */ @@ -81,14 +81,14 @@ public void startRewriteTime() { /** * Stop recording the current rewrite and add it's time to the total tally, returning the * cumulative time so far. - * - * @return cumulative rewrite time */ - public long stopAndAddRewriteTime() { - return ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); + public void stopAndAddRewriteTime() { + ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); } /** + * The rewriting process is complex and hard to display because queries can undergo significant changes. + * Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case. * @return total time taken to rewrite all queries in this profile */ public long getRewriteTime() { diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java index f29ba3b0cea07..db14eb90ef839 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java @@ -333,6 +333,58 @@ public void testBreakDownMapWithMultipleSlicesAndOneSliceWithNoLeafContext() thr directory.close(); } + public void testOneLeafContextWithEmptySliceCollectorsToLeaves() throws Exception { + final DirectoryReader directoryReader = getDirectoryReader(1); + final Directory directory = directoryReader.directory(); + final long createWeightEarliestStartTime = createWeightTimer.getEarliestTimerStartTime(); + final long createWeightEndTime = createWeightEarliestStartTime + createWeightTimer.getApproximateTiming(); + final Map leafProfileBreakdownMap_1 = getLeafBreakdownMap(createWeightEndTime + 10, 10, 1); + final AbstractProfileBreakdown leafProfileBreakdown_1 = new TestQueryProfileBreakdown( + QueryTimingType.class, + leafProfileBreakdownMap_1 + ); + testQueryProfileBreakdown.getContexts().put(directoryReader.leaves().get(0), leafProfileBreakdown_1); + final Map queryBreakDownMap = testQueryProfileBreakdown.toBreakdownMap(); + assertFalse(queryBreakDownMap == null || queryBreakDownMap.isEmpty()); + assertEquals(26, queryBreakDownMap.size()); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + String timingTypeKey = queryTimingType.toString(); + String timingTypeCountKey = queryTimingType + TIMING_TYPE_COUNT_SUFFIX; + + if (queryTimingType.equals(QueryTimingType.CREATE_WEIGHT)) { + final long createWeightTime = queryBreakDownMap.get(timingTypeKey); + assertEquals(createWeightTimer.getApproximateTiming(), createWeightTime); + assertEquals(1, (long) queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for weight type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + continue; + } + assertNotNull(queryBreakDownMap.get(timingTypeKey)); + assertNotNull(queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for current breakdown type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + } + assertEquals(0, testQueryProfileBreakdown.getMaxSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getMinSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getAvgSliceNodeTime()); + directoryReader.close(); + directory.close(); + } + private Map getLeafBreakdownMap(long startTime, long timeTaken, long count) { Map leafBreakDownMap = new HashMap<>(); for (QueryTimingType timingType : QueryTimingType.values()) { diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java new file mode 100644 index 0000000000000..736bbcdd9e8dd --- /dev/null +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.opensearch.search.profile.Timer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class ConcurrentQueryProfilerTests extends OpenSearchTestCase { + + public void testMergeRewriteTimeIntervals() { + ConcurrentQueryProfiler profiler = new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()); + List timers = new LinkedList<>(); + timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); + LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); + assertThat(mergedIntervals.size(), equalTo(2)); + long[] interval = mergedIntervals.get(0); + assertThat(interval[0], equalTo(553074509287335L)); + assertThat(interval[1], equalTo(553074509516290L)); + interval = mergedIntervals.get(1); + assertThat(interval[0], equalTo(553074511206907L)); + assertThat(interval[1], equalTo(553074511424041L)); + } +} diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index 64a440b85eb10..481a224f2ff0e 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -161,7 +161,9 @@ public void tearDown() throws Exception { } public void testBasic() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1); @@ -228,7 +230,9 @@ public void testBasic() throws IOException { } public void testNoScoring() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1, Sort.INDEXORDER); // scores are not needed @@ -295,7 +299,9 @@ public void testNoScoring() throws IOException { } public void testUseIndexStats() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.count(query); // will use index stats @@ -309,7 +315,9 @@ public void testUseIndexStats() throws IOException { } public void testApproximations() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new RandomApproximationQuery(new TermQuery(new Term("foo", "bar")), random()); searcher.count(query);