From 20507fb978ac0510e7fecdc3de62e5760697b711 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Thu, 12 Oct 2023 06:55:18 +0000 Subject: [PATCH] Refactor and work on the PR comments (#10352) Signed-off-by: Ticheng Lin --- CHANGELOG.md | 2 +- .../search/internal/ContextIndexSearcher.java | 30 +--- .../search/profile/AbstractProfiler.java | 22 +-- .../opensearch/search/profile/Profilers.java | 6 +- .../ConcurrentQueryProfileBreakdown.java | 14 +- .../query/ConcurrentQueryProfileTree.java | 10 -- .../query/ConcurrentQueryProfiler.java | 132 ++++++++++++++++++ .../search/profile/query/QueryProfiler.java | 72 ++-------- .../ConcurrentQueryProfileBreakdownTests.java | 52 +++++++ .../query/ConcurrentQueryProfilerTests.java | 36 +++++ .../profile/query/QueryProfilerTests.java | 26 +--- 11 files changed, 258 insertions(+), 144 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java create mode 100644 server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index d7c74ac896e66..d2f87f028b0b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) - [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) - Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261)) +- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) @@ -157,7 +158,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix registration and initialization of multiple extensions ([10256](https://github.com/opensearch-project/OpenSearch/pull/10256)) - Fix Segment Replication ShardLockObtainFailedException bug during index corruption ([10370](https://github.com/opensearch-project/OpenSearch/pull/10370)) - Fix some test methods in SimulatePipelineRequestParsingTests never run and fix test failure ([#10496](https://github.com/opensearch-project/OpenSearch/pull/10496)) -- Fix timer race condition in profile rewrite and create weight for concurrent segment search ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) ### Security diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index afa28d04db9cf..a520086de8051 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -71,7 +71,6 @@ import org.opensearch.search.dfs.AggregatedDfs; import org.opensearch.search.profile.ContextualProfileBreakdown; import org.opensearch.search.profile.Timer; -import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; import org.opensearch.search.profile.query.ProfileWeight; import org.opensearch.search.profile.query.QueryProfiler; import org.opensearch.search.profile.query.QueryTimingType; @@ -107,7 +106,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private QueryProfiler profiler; private MutableQueryTimeout cancellable; private SearchContext searchContext; - private boolean fromConcurrentPath = false; public ContextIndexSearcher( IndexReader reader, @@ -187,26 +185,15 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { @Override public Query rewrite(Query original) throws IOException { - Timer concurrentPathRewriteTimer = null; if (profiler != null) { - if (fromConcurrentPath) { - concurrentPathRewriteTimer = new Timer(); - profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer); - concurrentPathRewriteTimer.start(); - } else { - profiler.startRewriteTime(); - } + profiler.startRewriteTime(); } try { return super.rewrite(original); } finally { if (profiler != null) { - if (fromConcurrentPath) { - concurrentPathRewriteTimer.stop(); - } else { - profiler.stopAndAddRewriteTime(); - } + profiler.stopAndAddRewriteTime(); } } } @@ -217,15 +204,7 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws // createWeight() is called for each query in the tree, so we tell the queryProfiler // each invocation so that it can build an internal representation of the query // tree - ContextualProfileBreakdown profile; - if (searchContext.shouldUseConcurrentSearch()) { - long threadId = Thread.currentThread().getId(); - ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree() - .computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree()); - profile = profileTree.getProfileBreakdown(query); - } else { - profile = profiler.getQueryBreakdown(query); - } + ContextualProfileBreakdown profile = profiler.getQueryBreakdown(query); Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT); timer.start(); final Weight weight; @@ -288,9 +267,6 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { - if (searchContext.shouldUseConcurrentSearch()) { - fromConcurrentPath = true; - } // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. // This is actually beneficial for search queries to start search on latest segments first for time series workload. // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java index 9150c5ed212e5..4db1cb87a231d 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java @@ -32,11 +32,7 @@ package org.opensearch.search.profile; -import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; - -import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * Base class for a profiler @@ -46,7 +42,6 @@ public class AbstractProfiler, E> { protected final AbstractInternalProfileTree profileTree; - protected Map threadToProfileTree; public AbstractProfiler(AbstractInternalProfileTree profileTree) { this.profileTree = profileTree; @@ -64,27 +59,14 @@ public PB getQueryBreakdown(E query) { * Removes the last (e.g. most recent) element on the stack. */ public void pollLastElement() { - if (threadToProfileTree == null) { - profileTree.pollLast(); - } else { - long threadId = Thread.currentThread().getId(); - ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId); - concurrentProfileTree.pollLast(); - } + profileTree.pollLast(); } /** * @return a hierarchical representation of the profiled tree */ public List getTree() { - if (threadToProfileTree == null) { - return profileTree.getTree(); - } - List profileResults = new ArrayList<>(); - for (Map.Entry profile : threadToProfileTree.entrySet()) { - profileResults.addAll(profile.getValue().getTree()); - } - return profileResults; + return profileTree.getTree(); } } diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 8e87c7ff4acd4..4226c228f73cd 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -35,6 +35,8 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.profile.aggregation.AggregationProfiler; import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler; +import org.opensearch.search.profile.query.ConcurrentQueryProfiler; +import org.opensearch.search.profile.query.InternalQueryProfileTree; import org.opensearch.search.profile.query.QueryProfiler; import java.util.ArrayList; @@ -64,7 +66,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc /** Switch to a new profile. */ public QueryProfiler addQueryProfiler() { - QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled); + QueryProfiler profiler = isConcurrentSegmentSearchEnabled + ? new ConcurrentQueryProfiler() + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); queryProfilers.add(profiler); return profiler; diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java index e567fdd2d436c..e6e5700fef414 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java @@ -70,7 +70,7 @@ public Map toBreakdownMap() { ); final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString()); - if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) { + if (contexts.isEmpty()) { // If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the // create_weight time/count queryNodeTime = createWeightTime; @@ -78,6 +78,18 @@ public Map toBreakdownMap() { minSliceNodeTime = 0L; avgSliceNodeTime = 0L; return buildDefaultQueryBreakdownMap(createWeightTime); + } else if (sliceCollectorsToLeaves.isEmpty()) { + assert contexts.size() == 1 : "Unknown cases"; + // If there is a leaf context but no sliceCollectorsToLeaves, then return the rewritten query breakdown map + AbstractProfileBreakdown breakdown = contexts.values().iterator().next(); + queryNodeTime = breakdown.toNodeTime() + createWeightTime; + maxSliceNodeTime = 0L; + minSliceNodeTime = 0L; + avgSliceNodeTime = 0L; + Map queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap()); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L); + return queryBreakdownMap; } // first create the slice level breakdowns diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java index d6e5b79743b82..4e54178c3b4fb 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java @@ -12,9 +12,7 @@ import org.apache.lucene.search.Collector; import org.opensearch.search.profile.ContextualProfileBreakdown; import org.opensearch.search.profile.ProfileResult; -import org.opensearch.search.profile.Timer; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -24,7 +22,6 @@ * @opensearch.internal */ public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree { - protected List concurrentPathRewriteTimers = new ArrayList<>(); @Override protected ContextualProfileBreakdown createProfileBreakdown() { @@ -91,11 +88,4 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map< } } } - - /** - * @return the concurrent path rewrite timer list for this profile tree - */ - public List getConcurrentPathRewriteTimers() { - return concurrentPathRewriteTimers; - } } diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java new file mode 100644 index 0000000000000..ffd219350c183 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.apache.lucene.search.Query; +import org.opensearch.search.profile.ContextualProfileBreakdown; +import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.Timer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class acts as a thread-local storage for profiling a query with concurrent execution + * + * @opensearch.internal + */ +public final class ConcurrentQueryProfiler extends QueryProfiler { + + private final Map threadToProfileTree; + // The LinkedList does not need to be thread safe, as the map associates thread IDs with LinkedList, and only + // one thread will access the LinkedList at a time. + private final Map> threadToRewriteTimers; + + public ConcurrentQueryProfiler() { + super(new ConcurrentQueryProfileTree()); + long threadId = getCurrentThreadId(); + // We utilize LinkedHashMap to preserve the insertion order of the profiled queries + threadToProfileTree = Collections.synchronizedMap(new LinkedHashMap<>()); + threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); + threadToRewriteTimers = new ConcurrentHashMap<>(); + threadToRewriteTimers.put(threadId, new LinkedList<>()); + } + + @Override + public ContextualProfileBreakdown getQueryBreakdown(Query query) { + ConcurrentQueryProfileTree profileTree = threadToProfileTree.computeIfAbsent( + getCurrentThreadId(), + k -> new ConcurrentQueryProfileTree() + ); + return profileTree.getProfileBreakdown(query); + } + + /** + * Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack. + */ + @Override + public void pollLastElement() { + ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(getCurrentThreadId()); + concurrentProfileTree.pollLast(); + } + + /** + * @return a hierarchical representation of the profiled tree + */ + @Override + public List getTree() { + List profileResults = new ArrayList<>(); + for (Map.Entry profile : threadToProfileTree.entrySet()) { + profileResults.addAll(profile.getValue().getTree()); + } + return profileResults; + } + + /** + * Begin timing the rewrite phase of a request + */ + @Override + public void startRewriteTime() { + Timer rewriteTimer = new Timer(); + threadToRewriteTimers.computeIfAbsent(getCurrentThreadId(), k -> new LinkedList<>()).add(rewriteTimer); + rewriteTimer.start(); + } + + /** + * Stop recording the current rewrite timer + */ + public void stopAndAddRewriteTime() { + Timer rewriteTimer = threadToRewriteTimers.get(getCurrentThreadId()).getLast(); + rewriteTimer.stop(); + } + + /** + * @return total time taken to rewrite all queries in this concurrent query profiler + */ + @Override + public long getRewriteTime() { + long totalRewriteTime = 0L; + List rewriteTimers = new LinkedList<>(); + threadToRewriteTimers.values().forEach(rewriteTimers::addAll); + LinkedList mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers); + for (long[] interval : mergedIntervals) { + totalRewriteTime += interval[1] - interval[0]; + } + return totalRewriteTime; + } + + // package private for unit testing + LinkedList mergeRewriteTimeIntervals(List timers) { + LinkedList mergedIntervals = new LinkedList<>(); + timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); + for (Timer timer : timers) { + long startTime = timer.getEarliestTimerStartTime(); + long endTime = startTime + timer.getApproximateTiming(); + if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < startTime) { + long[] interval = new long[2]; + interval[0] = startTime; + interval[1] = endTime; + mergedIntervals.add(interval); + } else { + mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); + } + } + return mergedIntervals; + } + + private long getCurrentThreadId() { + return Thread.currentThread().getId(); + } +} diff --git a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java index af8852355bebe..875e443f855a2 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java @@ -35,14 +35,8 @@ import org.apache.lucene.search.Query; import org.opensearch.search.profile.AbstractProfiler; import org.opensearch.search.profile.ContextualProfileBreakdown; -import org.opensearch.search.profile.Timer; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; /** * This class acts as a thread-local storage for profiling a query. It also @@ -57,20 +51,19 @@ * * @opensearch.internal */ -public final class QueryProfiler extends AbstractProfiler, Query> { +public class QueryProfiler extends AbstractProfiler, Query> { /** * The root Collector used in the search */ private InternalProfileComponent collector; - public QueryProfiler(boolean concurrent) { - super(concurrent ? new ConcurrentQueryProfileTree() : new InternalQueryProfileTree()); - if (concurrent) { - long threadId = Thread.currentThread().getId(); - threadToProfileTree = new ConcurrentHashMap<>(); - threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); - } + public QueryProfiler(InternalQueryProfileTree tree) { + super(tree); + } + + public QueryProfiler(ConcurrentQueryProfileTree concurrentQueryProfileTree) { + super(concurrentQueryProfileTree); } /** Set the collector that is associated with this profiler. */ @@ -92,63 +85,18 @@ public void startRewriteTime() { /** * Stop recording the current rewrite and add it's time to the total tally, returning the * cumulative time so far. - * - * @return cumulative rewrite time */ - public long stopAndAddRewriteTime() { - return ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); + public void stopAndAddRewriteTime() { + ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); } /** * The rewriting process is complex and hard to display because queries can undergo significant changes. * Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case. - * In the concurrent search case, we compute the sum of the non-overlapping time across all slices and add - * the rewrite time from the non-concurrent path. * @return total time taken to rewrite all queries in this profile */ public long getRewriteTime() { - long rewriteTime = ((AbstractQueryProfileTree) profileTree).getRewriteTime(); - if (profileTree instanceof ConcurrentQueryProfileTree) { - List timers = getConcurrentPathRewriteTimers(); - LinkedList mergedIntervals = mergeRewriteTimeIntervals(timers); - for (long[] interval : mergedIntervals) { - rewriteTime += interval[1] - interval[0]; - } - } - return rewriteTime; - } - - // package private for unit testing - LinkedList mergeRewriteTimeIntervals(List timers) { - LinkedList mergedIntervals = new LinkedList<>(); - timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); - for (Timer timer : timers) { - long startTime = timer.getEarliestTimerStartTime(); - long endTime = timer.getEarliestTimerStartTime() + timer.getApproximateTiming(); - if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < timer.getEarliestTimerStartTime()) { - long[] interval = new long[2]; - interval[0] = startTime; - interval[1] = endTime; - mergedIntervals.add(interval); - } else { - mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); - } - } - return mergedIntervals; - } - - /** - * @return a list of concurrent path rewrite timers for this concurrent search - */ - public List getConcurrentPathRewriteTimers() { - return ((ConcurrentQueryProfileTree) profileTree).getConcurrentPathRewriteTimers(); - } - - /** - * @return the thread to profile tree map for this concurrent search - */ - public Map getThreadToProfileTree() { - return threadToProfileTree; + return ((AbstractQueryProfileTree) profileTree).getRewriteTime(); } /** diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java index f29ba3b0cea07..db14eb90ef839 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java @@ -333,6 +333,58 @@ public void testBreakDownMapWithMultipleSlicesAndOneSliceWithNoLeafContext() thr directory.close(); } + public void testOneLeafContextWithEmptySliceCollectorsToLeaves() throws Exception { + final DirectoryReader directoryReader = getDirectoryReader(1); + final Directory directory = directoryReader.directory(); + final long createWeightEarliestStartTime = createWeightTimer.getEarliestTimerStartTime(); + final long createWeightEndTime = createWeightEarliestStartTime + createWeightTimer.getApproximateTiming(); + final Map leafProfileBreakdownMap_1 = getLeafBreakdownMap(createWeightEndTime + 10, 10, 1); + final AbstractProfileBreakdown leafProfileBreakdown_1 = new TestQueryProfileBreakdown( + QueryTimingType.class, + leafProfileBreakdownMap_1 + ); + testQueryProfileBreakdown.getContexts().put(directoryReader.leaves().get(0), leafProfileBreakdown_1); + final Map queryBreakDownMap = testQueryProfileBreakdown.toBreakdownMap(); + assertFalse(queryBreakDownMap == null || queryBreakDownMap.isEmpty()); + assertEquals(26, queryBreakDownMap.size()); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + String timingTypeKey = queryTimingType.toString(); + String timingTypeCountKey = queryTimingType + TIMING_TYPE_COUNT_SUFFIX; + + if (queryTimingType.equals(QueryTimingType.CREATE_WEIGHT)) { + final long createWeightTime = queryBreakDownMap.get(timingTypeKey); + assertEquals(createWeightTimer.getApproximateTiming(), createWeightTime); + assertEquals(1, (long) queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for weight type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + continue; + } + assertNotNull(queryBreakDownMap.get(timingTypeKey)); + assertNotNull(queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for current breakdown type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + } + assertEquals(0, testQueryProfileBreakdown.getMaxSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getMinSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getAvgSliceNodeTime()); + directoryReader.close(); + directory.close(); + } + private Map getLeafBreakdownMap(long startTime, long timeTaken, long count) { Map leafBreakDownMap = new HashMap<>(); for (QueryTimingType timingType : QueryTimingType.values()) { diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java new file mode 100644 index 0000000000000..4d78632de06c9 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.opensearch.search.profile.Timer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class ConcurrentQueryProfilerTests extends OpenSearchTestCase { + + public void testMergeRewriteTimeIntervals() { + ConcurrentQueryProfiler profiler = new ConcurrentQueryProfiler(); + List timers = new LinkedList<>(); + timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); + LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); + assertThat(mergedIntervals.size(), equalTo(2)); + long[] interval = mergedIntervals.get(0); + assertThat(interval[0], equalTo(553074509287335L)); + assertThat(interval[1], equalTo(553074509516290L)); + interval = mergedIntervals.get(1); + assertThat(interval[0], equalTo(553074511206907L)); + assertThat(interval[1], equalTo(553074511424041L)); + } +} diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index 23bb99b8282b3..199cfea8bfcba 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -65,7 +65,6 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileResult; -import org.opensearch.search.profile.Timer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -74,7 +73,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -163,7 +161,7 @@ public void tearDown() throws Exception { } public void testBasic() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1); @@ -230,7 +228,7 @@ public void testBasic() throws IOException { } public void testNoScoring() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1, Sort.INDEXORDER); // scores are not needed @@ -297,7 +295,7 @@ public void testNoScoring() throws IOException { } public void testUseIndexStats() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.count(query); // will use index stats @@ -311,7 +309,7 @@ public void testUseIndexStats() throws IOException { } public void testApproximations() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null ? new ConcurrentQueryProfiler() : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new RandomApproximationQuery(new TermQuery(new Term("foo", "bar")), random()); searcher.count(query); @@ -484,20 +482,4 @@ public boolean shouldCache(Query query) throws IOException { } }; - - public void testMergeRewriteTimeIntervals() { - QueryProfiler profiler = new QueryProfiler(executor != null); - List timers = new LinkedList<>(); - timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); - timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); - timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); - LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); - assertThat(mergedIntervals.size(), equalTo(2)); - long[] interval = mergedIntervals.get(0); - assertThat(interval[0], equalTo(553074509287335L)); - assertThat(interval[1], equalTo(553074509516290L)); - interval = mergedIntervals.get(1); - assertThat(interval[0], equalTo(553074511206907L)); - assertThat(interval[1], equalTo(553074511424041L)); - } }