From 96cd9384912fb372fe3262b8a5b61aaa62c73350 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Thu, 12 Oct 2023 06:55:18 +0000 Subject: [PATCH] Refactor and work on the PR comments (#10352) Signed-off-by: Ticheng Lin --- .../search/internal/ContextIndexSearcher.java | 30 +--- .../search/profile/AbstractProfiler.java | 22 +-- .../opensearch/search/profile/Profilers.java | 3 +- .../query/ConcurrentQueryProfileTree.java | 11 +- .../query/ConcurrentQueryProfiler.java | 135 ++++++++++++++++++ .../search/profile/query/QueryProfiler.java | 72 ++-------- .../query/ConcurrentQueryProfilerTests.java | 36 +++++ .../profile/query/QueryProfilerTests.java | 25 +--- 8 files changed, 198 insertions(+), 136 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java create mode 100644 server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index afa28d04db9cf..a520086de8051 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -71,7 +71,6 @@ import org.opensearch.search.dfs.AggregatedDfs; import org.opensearch.search.profile.ContextualProfileBreakdown; import org.opensearch.search.profile.Timer; -import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; import org.opensearch.search.profile.query.ProfileWeight; import org.opensearch.search.profile.query.QueryProfiler; import org.opensearch.search.profile.query.QueryTimingType; @@ -107,7 +106,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private QueryProfiler profiler; private MutableQueryTimeout cancellable; private SearchContext searchContext; - private boolean fromConcurrentPath = false; public ContextIndexSearcher( IndexReader reader, @@ -187,26 +185,15 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { @Override public Query rewrite(Query original) throws IOException { - Timer concurrentPathRewriteTimer = null; if (profiler != null) { - if (fromConcurrentPath) { - concurrentPathRewriteTimer = new Timer(); - profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer); - concurrentPathRewriteTimer.start(); - } else { - profiler.startRewriteTime(); - } + profiler.startRewriteTime(); } try { return super.rewrite(original); } finally { if (profiler != null) { - if (fromConcurrentPath) { - concurrentPathRewriteTimer.stop(); - } else { - profiler.stopAndAddRewriteTime(); - } + profiler.stopAndAddRewriteTime(); } } } @@ -217,15 +204,7 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws // createWeight() is called for each query in the tree, so we tell the queryProfiler // each invocation so that it can build an internal representation of the query // tree - ContextualProfileBreakdown profile; - if (searchContext.shouldUseConcurrentSearch()) { - long threadId = Thread.currentThread().getId(); - ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree() - .computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree()); - profile = profileTree.getProfileBreakdown(query); - } else { - profile = profiler.getQueryBreakdown(query); - } + ContextualProfileBreakdown profile = profiler.getQueryBreakdown(query); Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT); timer.start(); final Weight weight; @@ -288,9 +267,6 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { - if (searchContext.shouldUseConcurrentSearch()) { - fromConcurrentPath = true; - } // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. // This is actually beneficial for search queries to start search on latest segments first for time series workload. // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java index 9150c5ed212e5..4db1cb87a231d 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java @@ -32,11 +32,7 @@ package org.opensearch.search.profile; -import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; - -import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * Base class for a profiler @@ -46,7 +42,6 @@ public class AbstractProfiler, E> { protected final AbstractInternalProfileTree profileTree; - protected Map threadToProfileTree; public AbstractProfiler(AbstractInternalProfileTree profileTree) { this.profileTree = profileTree; @@ -64,27 +59,14 @@ public PB getQueryBreakdown(E query) { * Removes the last (e.g. most recent) element on the stack. */ public void pollLastElement() { - if (threadToProfileTree == null) { - profileTree.pollLast(); - } else { - long threadId = Thread.currentThread().getId(); - ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId); - concurrentProfileTree.pollLast(); - } + profileTree.pollLast(); } /** * @return a hierarchical representation of the profiled tree */ public List getTree() { - if (threadToProfileTree == null) { - return profileTree.getTree(); - } - List profileResults = new ArrayList<>(); - for (Map.Entry profile : threadToProfileTree.entrySet()) { - profileResults.addAll(profile.getValue().getTree()); - } - return profileResults; + return profileTree.getTree(); } } diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 8e87c7ff4acd4..29a5112bcfac4 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -35,6 +35,7 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.profile.aggregation.AggregationProfiler; import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler; +import org.opensearch.search.profile.query.ConcurrentQueryProfiler; import org.opensearch.search.profile.query.QueryProfiler; import java.util.ArrayList; @@ -64,7 +65,7 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc /** Switch to a new profile. */ public QueryProfiler addQueryProfiler() { - QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled); + QueryProfiler profiler = isConcurrentSegmentSearchEnabled ? new ConcurrentQueryProfiler() : new QueryProfiler(); searcher.setProfiler(profiler); queryProfilers.add(profiler); return profiler; diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java index d6e5b79743b82..e4a3446cb18ec 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java @@ -14,9 +14,10 @@ import org.opensearch.search.profile.ProfileResult; import org.opensearch.search.profile.Timer; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * This class returns a list of {@link ProfileResult} that can be serialized back to the client in the concurrent execution. @@ -24,7 +25,7 @@ * @opensearch.internal */ public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree { - protected List concurrentPathRewriteTimers = new ArrayList<>(); + protected Map> threadToRewriteTimers = new ConcurrentHashMap<>(); @Override protected ContextualProfileBreakdown createProfileBreakdown() { @@ -93,9 +94,9 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map< } /** - * @return the concurrent path rewrite timer list for this profile tree + * @return the thread to rewrite timers map for this profile tree */ - public List getConcurrentPathRewriteTimers() { - return concurrentPathRewriteTimers; + public Map> getThreadToRewriteTimers() { + return threadToRewriteTimers; } } diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java new file mode 100644 index 0000000000000..18c19fd827eda --- /dev/null +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.apache.lucene.search.Query; +import org.opensearch.search.profile.ContextualProfileBreakdown; +import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.Timer; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class acts as a thread-local storage for profiling a query with concurrent execution + * + * @opensearch.internal + */ +public final class ConcurrentQueryProfiler extends QueryProfiler { + + private final Map threadToProfileTree; + + public ConcurrentQueryProfiler() { + super(new ConcurrentQueryProfileTree()); + long threadId = Thread.currentThread().getId(); + threadToProfileTree = new ConcurrentHashMap<>(); + threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); + } + + @Override + public ContextualProfileBreakdown getQueryBreakdown(Query query) { + long threadId = Thread.currentThread().getId(); + ConcurrentQueryProfileTree profileTree = getThreadToProfileTree().computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree()); + return profileTree.getProfileBreakdown(query); + } + + /** + * Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack. + */ + @Override + public void pollLastElement() { + long threadId = Thread.currentThread().getId(); + ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId); + concurrentProfileTree.pollLast(); + } + + /** + * @return a hierarchical representation of the profiled tree + */ + @Override + public List getTree() { + List profileResults = new ArrayList<>(); + for (Map.Entry profile : threadToProfileTree.entrySet()) { + profileResults.addAll(profile.getValue().getTree()); + } + return profileResults; + } + + /** + * Begin timing the rewrite phase of a request + */ + @Override + public void startRewriteTime() { + long threadId = Thread.currentThread().getId(); + Timer rewriteTimer = new Timer(); + getThreadToRewriteTimers().computeIfAbsent(threadId, k -> new LinkedList<>()).add(rewriteTimer); + rewriteTimer.start(); + } + + /** + * Stop recording the current rewrite timer + */ + public void stopAndAddRewriteTime() { + long threadId = Thread.currentThread().getId(); + Timer rewriteTimer = getThreadToRewriteTimers().get(threadId).getLast(); + rewriteTimer.stop(); + } + + /** + * @return total time taken to rewrite all queries in this concurrent query profiler + */ + @Override + public long getRewriteTime() { + long totalRewriteTime = 0L; + List rewriteTimers = new LinkedList<>(); + getThreadToRewriteTimers().values().forEach(rewriteTimers::addAll); + LinkedList mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers); + for (long[] interval : mergedIntervals) { + totalRewriteTime += interval[1] - interval[0]; + } + return totalRewriteTime; + } + + // package private for unit testing + LinkedList mergeRewriteTimeIntervals(List timers) { + LinkedList mergedIntervals = new LinkedList<>(); + timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); + for (Timer timer : timers) { + long startTime = timer.getEarliestTimerStartTime(); + long endTime = timer.getEarliestTimerStartTime() + timer.getApproximateTiming(); + if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < timer.getEarliestTimerStartTime()) { + long[] interval = new long[2]; + interval[0] = startTime; + interval[1] = endTime; + mergedIntervals.add(interval); + } else { + mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); + } + } + return mergedIntervals; + } + + /** + * @return the thread to rewrite timers map for this concurrent search + */ + public Map> getThreadToRewriteTimers() { + return ((ConcurrentQueryProfileTree) profileTree).getThreadToRewriteTimers(); + } + + /** + * @return the thread to profile tree map for this concurrent search + */ + public Map getThreadToProfileTree() { + return threadToProfileTree; + } +} diff --git a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java index af8852355bebe..f221625076988 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java @@ -35,14 +35,8 @@ import org.apache.lucene.search.Query; import org.opensearch.search.profile.AbstractProfiler; import org.opensearch.search.profile.ContextualProfileBreakdown; -import org.opensearch.search.profile.Timer; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; /** * This class acts as a thread-local storage for profiling a query. It also @@ -57,20 +51,19 @@ * * @opensearch.internal */ -public final class QueryProfiler extends AbstractProfiler, Query> { +public class QueryProfiler extends AbstractProfiler, Query> { /** * The root Collector used in the search */ private InternalProfileComponent collector; - public QueryProfiler(boolean concurrent) { - super(concurrent ? new ConcurrentQueryProfileTree() : new InternalQueryProfileTree()); - if (concurrent) { - long threadId = Thread.currentThread().getId(); - threadToProfileTree = new ConcurrentHashMap<>(); - threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); - } + public QueryProfiler() { + super(new InternalQueryProfileTree()); + } + + public QueryProfiler(ConcurrentQueryProfileTree concurrentQueryProfileTree) { + super(concurrentQueryProfileTree); } /** Set the collector that is associated with this profiler. */ @@ -92,63 +85,18 @@ public void startRewriteTime() { /** * Stop recording the current rewrite and add it's time to the total tally, returning the * cumulative time so far. - * - * @return cumulative rewrite time */ - public long stopAndAddRewriteTime() { - return ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); + public void stopAndAddRewriteTime() { + ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); } /** * The rewriting process is complex and hard to display because queries can undergo significant changes. * Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case. - * In the concurrent search case, we compute the sum of the non-overlapping time across all slices and add - * the rewrite time from the non-concurrent path. * @return total time taken to rewrite all queries in this profile */ public long getRewriteTime() { - long rewriteTime = ((AbstractQueryProfileTree) profileTree).getRewriteTime(); - if (profileTree instanceof ConcurrentQueryProfileTree) { - List timers = getConcurrentPathRewriteTimers(); - LinkedList mergedIntervals = mergeRewriteTimeIntervals(timers); - for (long[] interval : mergedIntervals) { - rewriteTime += interval[1] - interval[0]; - } - } - return rewriteTime; - } - - // package private for unit testing - LinkedList mergeRewriteTimeIntervals(List timers) { - LinkedList mergedIntervals = new LinkedList<>(); - timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); - for (Timer timer : timers) { - long startTime = timer.getEarliestTimerStartTime(); - long endTime = timer.getEarliestTimerStartTime() + timer.getApproximateTiming(); - if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < timer.getEarliestTimerStartTime()) { - long[] interval = new long[2]; - interval[0] = startTime; - interval[1] = endTime; - mergedIntervals.add(interval); - } else { - mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); - } - } - return mergedIntervals; - } - - /** - * @return a list of concurrent path rewrite timers for this concurrent search - */ - public List getConcurrentPathRewriteTimers() { - return ((ConcurrentQueryProfileTree) profileTree).getConcurrentPathRewriteTimers(); - } - - /** - * @return the thread to profile tree map for this concurrent search - */ - public Map getThreadToProfileTree() { - return threadToProfileTree; + return ((AbstractQueryProfileTree) profileTree).getRewriteTime(); } /** diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java new file mode 100644 index 0000000000000..4d78632de06c9 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.opensearch.search.profile.Timer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class ConcurrentQueryProfilerTests extends OpenSearchTestCase { + + public void testMergeRewriteTimeIntervals() { + ConcurrentQueryProfiler profiler = new ConcurrentQueryProfiler(); + List timers = new LinkedList<>(); + timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); + LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); + assertThat(mergedIntervals.size(), equalTo(2)); + long[] interval = mergedIntervals.get(0); + assertThat(interval[0], equalTo(553074509287335L)); + assertThat(interval[1], equalTo(553074509516290L)); + interval = mergedIntervals.get(1); + assertThat(interval[0], equalTo(553074511206907L)); + assertThat(interval[1], equalTo(553074511424041L)); + } +} diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index 23bb99b8282b3..a6081c05fcb74 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -65,7 +65,6 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileResult; -import org.opensearch.search.profile.Timer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -74,7 +73,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -163,7 +161,7 @@ public void tearDown() throws Exception { } public void testBasic() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1); @@ -230,7 +228,7 @@ public void testBasic() throws IOException { } public void testNoScoring() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1, Sort.INDEXORDER); // scores are not needed @@ -297,7 +295,7 @@ public void testNoScoring() throws IOException { } public void testUseIndexStats() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.count(query); // will use index stats @@ -311,7 +309,7 @@ public void testUseIndexStats() throws IOException { } public void testApproximations() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(); searcher.setProfiler(profiler); Query query = new RandomApproximationQuery(new TermQuery(new Term("foo", "bar")), random()); searcher.count(query); @@ -485,19 +483,4 @@ public boolean shouldCache(Query query) throws IOException { }; - public void testMergeRewriteTimeIntervals() { - QueryProfiler profiler = new QueryProfiler(executor != null); - List timers = new LinkedList<>(); - timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); - timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); - timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); - LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); - assertThat(mergedIntervals.size(), equalTo(2)); - long[] interval = mergedIntervals.get(0); - assertThat(interval[0], equalTo(553074509287335L)); - assertThat(interval[1], equalTo(553074509516290L)); - interval = mergedIntervals.get(1); - assertThat(interval[0], equalTo(553074511206907L)); - assertThat(interval[1], equalTo(553074511424041L)); - } }