Skip to content

Commit

Permalink
Add terminate_after threshold for concurrent vs non-concurrent segmen…
Browse files Browse the repository at this point in the history
…t search

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 27, 2023
1 parent e617da3 commit bcae67c
Show file tree
Hide file tree
Showing 15 changed files with 680 additions and 624 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add threshold setting to determine when concurrent segment search should be used with terminate_after parameter ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD;

/**
* The main search context used during search phase
Expand Down Expand Up @@ -896,6 +897,8 @@ public void evaluateRequestShouldUseConcurrentSearch() {
&& aggregations().factories() != null
&& !aggregations().factories().allFactoriesSupportConcurrentSearch()) {
requestShouldUseConcurrentSearch.set(false);
} else if (terminateAfter < clusterService.getClusterSettings().get(TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD)) {
requestShouldUseConcurrentSearch.set(false);
} else {
requestShouldUseConcurrentSearch.set(true);
}
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic
);

/**
* Threshold for terminate_after search parameter below which non-concurrent search will always be used.
*/
public static final Setting<Integer> TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD = Setting.intSetting(
"search.terminate_after_concurrent_search_threshold",
100000,
0,
Property.NodeScope,
Property.Dynamic
);

/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.EarlyTerminatingCollector;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -292,7 +293,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {

// Check if at all we need to call this leaf for collecting results.
if (canMatch(ctx) == false) {
if (canMatch(ctx) == false || searchContext.isTerminatedEarly()) {
return;
}

Expand All @@ -310,6 +311,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -325,6 +329,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -344,6 +351,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;
private volatile boolean terminatedEarly;

protected SearchContext() {}

Expand All @@ -136,6 +137,14 @@ public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

public boolean isTerminatedEarly() {
return this.terminatedEarly;
}

public void setTerminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private static boolean searchWithCollectorManager(
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}

return topDocsFactory.shouldRescore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@
import org.apache.lucene.search.LeafCollector;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
*
* @opensearch.internal
*/
public class EarlyTerminatingCollector extends FilterCollector {
static final class EarlyTerminationException extends RuntimeException {

/**
* Force termination exception
*/
public static final class EarlyTerminationException extends RuntimeException {
EarlyTerminationException(String msg) {
super(msg);
}
}

private final int maxCountHits;
private int numCollected;
private boolean forceTermination;
private final AtomicLong numCollected;
private final boolean forceTermination;
private boolean earlyTerminated;

/**
Expand All @@ -69,11 +74,19 @@ static final class EarlyTerminationException extends RuntimeException {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination, AtomicLong numCollected) {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = numCollected;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (numCollected >= maxCountHits) {
if (numCollected.get() >= maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand All @@ -84,7 +97,7 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
return new FilterLeafCollector(super.getLeafCollector(context)) {
@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
if (numCollected.incrementAndGet() > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manager for the EarlyTerminatingCollector
Expand All @@ -29,16 +30,23 @@ public class EarlyTerminatingCollectorManager<C extends Collector>
private final CollectorManager<C, ReduceableSearchResult> manager;
private final int maxCountHits;
private boolean forceTermination;
private final AtomicLong numCollected;

EarlyTerminatingCollectorManager(CollectorManager<C, ReduceableSearchResult> manager, int maxCountHits, boolean forceTermination) {
this.manager = manager;
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

@Override
public EarlyTerminatingCollector newCollector() throws IOException {
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
return new EarlyTerminatingCollector(
manager.newCollector(),
maxCountHits,
forceTermination /* forced termination is not supported */,
numCollected
);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,8 @@ private static boolean searchWithCollector(
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searcher.search(query, queryCollector);
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}
if (searchContext.isSearchTimedOut()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,18 @@ public void testTerminateAfterEarlyTermination() throws Exception {
}
w.close();
final IndexReader reader = DirectoryReader.open(dir);
TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader, executor));
TestSearchContext context = new TestSearchContext(null, indexShard);
context.setSearcher(
new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor,
context
)
);
context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
if (this.executor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,19 @@ public void testTerminateAfterEarlyTermination() throws Exception {
}
w.close();
final IndexReader reader = DirectoryReader.open(dir);
TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader, executor));
TestSearchContext context = new TestSearchContext(null, indexShard);
context.setSearcher(
new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
executor,
context
)
);
context.setConcurrentSegmentSearchEnabled(executor != null);
context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ public boolean shouldUseTimeSeriesDescSortOptimization() {
public TestSearchContext withCleanQueryResult() {
queryResult.consumeAll();
profilers = null;
setTerminatedEarly(false);
return this;
}

Expand Down

0 comments on commit bcae67c

Please sign in to comment.