Skip to content

Commit

Permalink
stashing terminate_after threshold changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Deng committed Oct 2, 2023
1 parent 797def6 commit 082fcfb
Show file tree
Hide file tree
Showing 17 changed files with 755 additions and 642 deletions.

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD;

/**
* The main search context used during search phase
Expand Down Expand Up @@ -186,6 +187,7 @@ final class DefaultSearchContext extends SearchContext {
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final boolean concurrentSearchSettingsEnabled;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final int terminateAfterThreshold;

DefaultSearchContext(
ReaderContext readerContext,
Expand Down Expand Up @@ -217,6 +219,7 @@ final class DefaultSearchContext extends SearchContext {
this.clusterService = clusterService;
this.engineSearcher = readerContext.acquireSearcher("search");
this.concurrentSearchSettingsEnabled = evaluateConcurrentSegmentSearchSettings(executor);
this.terminateAfterThreshold = getTerminateAfterThreshold();
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
Expand Down Expand Up @@ -896,11 +899,24 @@ public void evaluateRequestShouldUseConcurrentSearch() {
&& aggregations().factories() != null
&& !aggregations().factories().allFactoriesSupportConcurrentSearch()) {
requestShouldUseConcurrentSearch.set(false);
} else if (terminateAfter != 0 && terminateAfter < terminateAfterThreshold) {
requestShouldUseConcurrentSearch.set(false);
} else {
requestShouldUseConcurrentSearch.set(true);
}
}

/**
* Get the terminate_after threshold below which we should revert to non-concurrent search
*/
private int getTerminateAfterThreshold() {
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH) && (clusterService != null)) {
return clusterService.getClusterSettings().get(TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD);
} else {
return Integer.MAX_VALUE;
}
}

public void setProfilers(Profilers profilers) {
this.profilers = profilers;
}
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic
);

/**
* Threshold for terminate_after search parameter below which non-concurrent search will always be used.
*/
public static final Setting<Integer> TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD = Setting.intSetting(
"search.terminate_after_concurrent_search_threshold",
100000,
0,
Property.NodeScope,
Property.Dynamic
);

/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.EarlyTerminatingCollector;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -292,7 +293,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {

// Check if at all we need to call this leaf for collecting results.
if (canMatch(ctx) == false) {
if (canMatch(ctx) == false || searchContext.isTerminatedEarly()) {
return;
}

Expand All @@ -310,6 +311,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -325,6 +329,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -344,6 +351,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;
private volatile boolean terminatedEarly;

protected SearchContext() {}

Expand All @@ -136,6 +137,14 @@ public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

public boolean isTerminatedEarly() {
return this.terminatedEarly;
}

public void setTerminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private static boolean searchWithCollectorManager(
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}

return topDocsFactory.shouldRescore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@
import org.apache.lucene.search.LeafCollector;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
*
* @opensearch.internal
*/
public class EarlyTerminatingCollector extends FilterCollector {
static final class EarlyTerminationException extends RuntimeException {

/**
* Force termination exception
*/
public static final class EarlyTerminationException extends RuntimeException {
EarlyTerminationException(String msg) {
super(msg);
}
}

private final int maxCountHits;
private int numCollected;
private boolean forceTermination;
private final AtomicLong numCollected;
private final boolean forceTermination;
private boolean earlyTerminated;

/**
Expand All @@ -69,11 +74,19 @@ static final class EarlyTerminationException extends RuntimeException {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination, AtomicLong numCollected) {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = numCollected;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (numCollected >= maxCountHits) {
if (numCollected.get() >= maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand All @@ -84,7 +97,7 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
return new FilterLeafCollector(super.getLeafCollector(context)) {
@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
if (numCollected.incrementAndGet() > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manager for the EarlyTerminatingCollector
Expand All @@ -29,16 +30,23 @@ public class EarlyTerminatingCollectorManager<C extends Collector>
private final CollectorManager<C, ReduceableSearchResult> manager;
private final int maxCountHits;
private boolean forceTermination;
private final AtomicLong numCollected;

EarlyTerminatingCollectorManager(CollectorManager<C, ReduceableSearchResult> manager, int maxCountHits, boolean forceTermination) {
this.manager = manager;
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

@Override
public EarlyTerminatingCollector newCollector() throws IOException {
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
return new EarlyTerminatingCollector(
manager.newCollector(),
maxCountHits,
forceTermination /* forced termination is not supported */,
numCollected
);
}

@SuppressWarnings("unchecked")
Expand All @@ -57,6 +65,7 @@ public ReduceableSearchResult reduce(Collection<EarlyTerminatingCollector> colle
if (didTerminateEarly) {
onEarlyTermination(maxCountHits, forceTermination);

// this reduce is how we get the TopDocs of the inner collectors
final ReduceableSearchResult result = manager.reduce(innerCollectors);
return new ReduceableSearchResult() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,8 @@ private static boolean searchWithCollector(
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searcher.search(query, queryCollector);
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}
if (searchContext.isSearchTimedOut()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
Expand Down Expand Up @@ -173,7 +172,7 @@ private EmptyTopDocsCollectorContext(
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) throws IOException {
assert in == null;

CollectorManager<?, ReduceableSearchResult> manager = null;
CollectorManager<?, ReduceableSearchResult> manager;

if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
manager = new EarlyTerminatingCollectorManager<>(
Expand All @@ -184,10 +183,10 @@ CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, Re
} else {
if (hitCount == -1) {
if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_ACCURATE) {
manager = new TotalHitCountCollectorManager(sort);
manager = new TotalHitCountCollectorManager(sort, trackTotalHitsUpTo);
} else {
manager = new EarlyTerminatingCollectorManager<>(
new TotalHitCountCollectorManager(sort),
new TotalHitCountCollectorManager(sort, trackTotalHitsUpTo),
trackTotalHitsUpTo,
false
);
Expand Down Expand Up @@ -577,19 +576,7 @@ TopDocsAndMaxScore newTopDocs(final TopDocs topDocs, final float maxScore, final
}
}

// Since we cannot support early forced termination, we have to simulate it by
// artificially reducing the number of total hits and doc scores.
ScoreDoc[] scoreDocs = topDocs.scoreDocs;
if (terminatedAfter != null) {
if (totalHits.value > terminatedAfter) {
totalHits = new TotalHits(terminatedAfter, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
}

if (scoreDocs != null && scoreDocs.length > terminatedAfter) {
scoreDocs = Arrays.copyOf(scoreDocs, terminatedAfter);
}
}

final TopDocs newTopDocs;
if (topDocs instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) topDocs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ public ScoreMode scoreMode() {

private final Sort sort;
private Integer terminatedAfter;
private final int trackTotalHitsUpTo;

public TotalHitCountCollectorManager(final Sort sort) {
this.sort = sort;
trackTotalHitsUpTo = -1;
}

public TotalHitCountCollectorManager(final Sort sort, final int trackTotalHitsUpTo) {
this.sort = sort;
this.trackTotalHitsUpTo = trackTotalHitsUpTo;
}

@Override
Expand All @@ -72,11 +79,11 @@ public TotalHitCountCollector newCollector() throws IOException {
@Override
public ReduceableSearchResult reduce(Collection<TotalHitCountCollector> collectors) throws IOException {
return (QuerySearchResult result) -> {
final TotalHits.Relation relation = (terminatedAfter != null)
int totalHits = collectors.stream().mapToInt(TotalHitCountCollector::getTotalHits).sum();
final TotalHits.Relation relation = (trackTotalHitsUpTo <= totalHits)
? TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO
: TotalHits.Relation.EQUAL_TO;

int totalHits = collectors.stream().mapToInt(TotalHitCountCollector::getTotalHits).sum();
if (terminatedAfter != null && totalHits > terminatedAfter) {
totalHits = terminatedAfter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
clusterSettings.applySettings(
Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build()
);
clusterSettings.registerSetting(SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
DefaultSearchContext context = new DefaultSearchContext(
readerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,18 @@ public void testTerminateAfterEarlyTermination() throws Exception {
}
w.close();
final IndexReader reader = DirectoryReader.open(dir);
TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader, executor));
TestSearchContext context = new TestSearchContext(null, indexShard);
context.setSearcher(
new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor,
context
)
);
context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
if (this.executor != null) {
Expand Down
Loading

0 comments on commit 082fcfb

Please sign in to comment.