Skip to content

Commit

Permalink
Add terminate_after threshold for concurrent vs non-concurrent segmen…
Browse files Browse the repository at this point in the history
…t search

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 27, 2023
1 parent e617da3 commit ad70a60
Show file tree
Hide file tree
Showing 18 changed files with 716 additions and 637 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add threshold setting to determine when concurrent segment search should be used with terminate_after parameter ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD;

/**
* The main search context used during search phase
Expand Down Expand Up @@ -186,6 +187,7 @@ final class DefaultSearchContext extends SearchContext {
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final boolean concurrentSearchSettingsEnabled;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final int terminateAfterThreshold;

DefaultSearchContext(
ReaderContext readerContext,
Expand Down Expand Up @@ -217,6 +219,7 @@ final class DefaultSearchContext extends SearchContext {
this.clusterService = clusterService;
this.engineSearcher = readerContext.acquireSearcher("search");
this.concurrentSearchSettingsEnabled = evaluateConcurrentSegmentSearchSettings(executor);
this.terminateAfterThreshold = getTerminateAfterThreshold();
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
Expand Down Expand Up @@ -896,11 +899,24 @@ public void evaluateRequestShouldUseConcurrentSearch() {
&& aggregations().factories() != null
&& !aggregations().factories().allFactoriesSupportConcurrentSearch()) {
requestShouldUseConcurrentSearch.set(false);
} else if (terminateAfter != 0 && terminateAfter < terminateAfterThreshold) {
requestShouldUseConcurrentSearch.set(false);
} else {
requestShouldUseConcurrentSearch.set(true);
}
}

/**
* Get the terminate_after threshold below which we should revert to non-concurrent search
*/
private int getTerminateAfterThreshold() {
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH) && (clusterService != null)) {
return clusterService.getClusterSettings().get(TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD);
} else {
return Integer.MAX_VALUE;
}
}

public void setProfilers(Profilers profilers) {
this.profilers = profilers;
}
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic
);

/**
* Threshold for terminate_after search parameter below which non-concurrent search will always be used.
*/
public static final Setting<Integer> TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD = Setting.intSetting(
"search.terminate_after_concurrent_search_threshold",
100000,
0,
Property.NodeScope,
Property.Dynamic
);

/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.EarlyTerminatingCollector;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -292,7 +293,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {

// Check if at all we need to call this leaf for collecting results.
if (canMatch(ctx) == false) {
if (canMatch(ctx) == false || searchContext.isTerminatedEarly()) {
return;
}

Expand All @@ -310,6 +311,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -325,6 +329,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -344,6 +351,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;
private volatile boolean terminatedEarly;

protected SearchContext() {}

Expand All @@ -136,6 +137,14 @@ public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

public boolean isTerminatedEarly() {
return this.terminatedEarly;
}

public void setTerminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private static boolean searchWithCollectorManager(
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}

return topDocsFactory.shouldRescore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@
import org.apache.lucene.search.LeafCollector;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
*
* @opensearch.internal
*/
public class EarlyTerminatingCollector extends FilterCollector {
static final class EarlyTerminationException extends RuntimeException {

/**
* Force termination exception
*/
public static final class EarlyTerminationException extends RuntimeException {
EarlyTerminationException(String msg) {
super(msg);
}
}

private final int maxCountHits;
private int numCollected;
private boolean forceTermination;
private final AtomicLong numCollected;
private final boolean forceTermination;
private boolean earlyTerminated;

/**
Expand All @@ -69,11 +74,19 @@ static final class EarlyTerminationException extends RuntimeException {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination, AtomicLong numCollected) {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = numCollected;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (numCollected >= maxCountHits) {
if (numCollected.get() >= maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand All @@ -84,7 +97,7 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
return new FilterLeafCollector(super.getLeafCollector(context)) {
@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
if (numCollected.incrementAndGet() > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manager for the EarlyTerminatingCollector
Expand All @@ -29,16 +30,23 @@ public class EarlyTerminatingCollectorManager<C extends Collector>
private final CollectorManager<C, ReduceableSearchResult> manager;
private final int maxCountHits;
private boolean forceTermination;
private final AtomicLong numCollected;

EarlyTerminatingCollectorManager(CollectorManager<C, ReduceableSearchResult> manager, int maxCountHits, boolean forceTermination) {
this.manager = manager;
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

@Override
public EarlyTerminatingCollector newCollector() throws IOException {
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
return new EarlyTerminatingCollector(
manager.newCollector(),
maxCountHits,
forceTermination /* forced termination is not supported */,
numCollected
);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,8 @@ private static boolean searchWithCollector(
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searcher.search(query, queryCollector);
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}
if (searchContext.isSearchTimedOut()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
Expand Down Expand Up @@ -577,19 +576,7 @@ TopDocsAndMaxScore newTopDocs(final TopDocs topDocs, final float maxScore, final
}
}

// Since we cannot support early forced termination, we have to simulate it by
// artificially reducing the number of total hits and doc scores.
ScoreDoc[] scoreDocs = topDocs.scoreDocs;
if (terminatedAfter != null) {
if (totalHits.value > terminatedAfter) {
totalHits = new TotalHits(terminatedAfter, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
}

if (scoreDocs != null && scoreDocs.length > terminatedAfter) {
scoreDocs = Arrays.copyOf(scoreDocs, terminatedAfter);
}
}

final TopDocs newTopDocs;
if (topDocs instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) topDocs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public TotalHitCountCollector newCollector() throws IOException {

@Override
public ReduceableSearchResult reduce(Collection<TotalHitCountCollector> collectors) throws IOException {
// TODO: This does not work for terminate_after < track_total_hits
return (QuerySearchResult result) -> {
final TotalHits.Relation relation = (terminatedAfter != null)
? TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
clusterSettings.applySettings(
Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build()
);
clusterSettings.registerSetting(SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
DefaultSearchContext context = new DefaultSearchContext(
readerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,18 @@ public void testTerminateAfterEarlyTermination() throws Exception {
}
w.close();
final IndexReader reader = DirectoryReader.open(dir);
TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader, executor));
TestSearchContext context = new TestSearchContext(null, indexShard);
context.setSearcher(
new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor,
context
)
);
context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
if (this.executor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,19 @@ public void testTerminateAfterEarlyTermination() throws Exception {
}
w.close();
final IndexReader reader = DirectoryReader.open(dir);
TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader, executor));
TestSearchContext context = new TestSearchContext(null, indexShard);
context.setSearcher(
new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor,
context
)
);
context.setConcurrentSegmentSearchEnabled(executor != null);
context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ public boolean shouldUseTimeSeriesDescSortOptimization() {
public TestSearchContext withCleanQueryResult() {
queryResult.consumeAll();
profilers = null;
setTerminatedEarly(false);
return this;
}

Expand Down

0 comments on commit ad70a60

Please sign in to comment.