Skip to content

Commit

Permalink
Add terminate_after threshold for concurrent vs non-concurrent segmen…
Browse files Browse the repository at this point in the history
…t search

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 27, 2023
1 parent e617da3 commit 0c7d06c
Show file tree
Hide file tree
Showing 16 changed files with 714 additions and 636 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add threshold setting to determine when concurrent segment search should be used with terminate_after parameter ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
SearchService.TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ final class DefaultSearchContext extends SearchContext {
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final boolean concurrentSearchSettingsEnabled;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final int terminateAfterThreshold;

DefaultSearchContext(
ReaderContext readerContext,
Expand All @@ -200,7 +201,8 @@ final class DefaultSearchContext extends SearchContext {
Version minNodeVersion,
boolean validate,
Executor executor,
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
int terminateAfterThreshold
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand Down Expand Up @@ -239,6 +241,7 @@ final class DefaultSearchContext extends SearchContext {
queryBoost = request.indexBoost();
this.lowLevelCancellation = lowLevelCancellation;
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
this.terminateAfterThreshold = terminateAfterThreshold;
}

@Override
Expand Down Expand Up @@ -896,6 +899,8 @@ public void evaluateRequestShouldUseConcurrentSearch() {
&& aggregations().factories() != null
&& !aggregations().factories().allFactoriesSupportConcurrentSearch()) {
requestShouldUseConcurrentSearch.set(false);
} else if (terminateAfter < terminateAfterThreshold) {
requestShouldUseConcurrentSearch.set(false);
} else {
requestShouldUseConcurrentSearch.set(true);
}
Expand Down
23 changes: 22 additions & 1 deletion server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.Dynamic
);

/**
* Threshold for terminate_after search parameter below which non-concurrent search will always be used.
*/
public static final Setting<Integer> TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD = Setting.intSetting(
"search.terminate_after_concurrent_search_threshold",
100000,
0,
Property.NodeScope,
Property.Dynamic
);

/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
Expand Down Expand Up @@ -318,6 +329,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final AtomicInteger openPitContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;
private volatile int terminateAfterThreshold;

public SearchService(
ClusterService clusterService,
Expand Down Expand Up @@ -377,6 +389,10 @@ public SearchService(

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);

terminateAfterThreshold = TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TERMINATE_AFTER_CONCURRENT_SEARCH_THRESHOLD, this::setTerminateAfterConcurrentSearchThreshold);
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -453,6 +469,10 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

private void setTerminateAfterConcurrentSearchThreshold(int terminateAfterConcurrentSearchThreshold) {
this.terminateAfterThreshold = terminateAfterConcurrentSearchThreshold;
}

@Override
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information
Expand Down Expand Up @@ -1060,7 +1080,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
clusterService.state().nodes().getMinNodeVersion(),
validate,
indexSearcherExecutor,
this::aggReduceContextBuilder
this::aggReduceContextBuilder,
terminateAfterThreshold
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.EarlyTerminatingCollector;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -292,7 +293,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {

// Check if at all we need to call this leaf for collecting results.
if (canMatch(ctx) == false) {
if (canMatch(ctx) == false || searchContext.isTerminatedEarly()) {
return;
}

Expand All @@ -310,6 +311,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -325,6 +329,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -344,6 +351,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;
private volatile boolean terminatedEarly;

protected SearchContext() {}

Expand All @@ -136,6 +137,14 @@ public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

public boolean isTerminatedEarly() {
return this.terminatedEarly;
}

public void setTerminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private static boolean searchWithCollectorManager(
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}

return topDocsFactory.shouldRescore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@
import org.apache.lucene.search.LeafCollector;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
*
* @opensearch.internal
*/
public class EarlyTerminatingCollector extends FilterCollector {
static final class EarlyTerminationException extends RuntimeException {

/**
* Force termination exception
*/
public static final class EarlyTerminationException extends RuntimeException {
EarlyTerminationException(String msg) {
super(msg);
}
}

private final int maxCountHits;
private int numCollected;
private boolean forceTermination;
private final AtomicLong numCollected;
private final boolean forceTermination;
private boolean earlyTerminated;

/**
Expand All @@ -69,11 +74,19 @@ static final class EarlyTerminationException extends RuntimeException {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination, AtomicLong numCollected) {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = numCollected;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (numCollected >= maxCountHits) {
if (numCollected.get() >= maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand All @@ -84,7 +97,7 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
return new FilterLeafCollector(super.getLeafCollector(context)) {
@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
if (numCollected.incrementAndGet() > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manager for the EarlyTerminatingCollector
Expand All @@ -29,16 +30,23 @@ public class EarlyTerminatingCollectorManager<C extends Collector>
private final CollectorManager<C, ReduceableSearchResult> manager;
private final int maxCountHits;
private boolean forceTermination;
private final AtomicLong numCollected;

EarlyTerminatingCollectorManager(CollectorManager<C, ReduceableSearchResult> manager, int maxCountHits, boolean forceTermination) {
this.manager = manager;
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

@Override
public EarlyTerminatingCollector newCollector() throws IOException {
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
return new EarlyTerminatingCollector(
manager.newCollector(),
maxCountHits,
forceTermination /* forced termination is not supported */,
numCollected
);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,8 @@ private static boolean searchWithCollector(
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searcher.search(query, queryCollector);
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}
if (searchContext.isSearchTimedOut()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
contextWithoutScroll.from(300);
contextWithoutScroll.close();
Expand Down Expand Up @@ -263,7 +264,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context1.from(300);
exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false));
Expand Down Expand Up @@ -334,7 +336,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);

SliceBuilder sliceBuilder = mock(SliceBuilder.class);
Expand Down Expand Up @@ -374,7 +377,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
Expand Down Expand Up @@ -410,7 +414,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess(false);
Query query1 = context4.query();
Expand Down Expand Up @@ -441,7 +446,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
int numSlicesForPit = maxSlicesPerPit + randomIntBetween(1, 100);
when(sliceBuilder.getMax()).thenReturn(numSlicesForPit);
Expand Down Expand Up @@ -539,7 +545,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
assertThat(context.searcher().hasCancellations(), is(false));
context.searcher().addQueryCancellation(() -> {});
Expand Down Expand Up @@ -639,7 +646,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);

// Case1: if sort is on timestamp field, non-concurrent path is used
Expand All @@ -664,7 +672,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context.sort(
new SortAndFormats(new Sort(new SortField("test2", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW })
Expand All @@ -691,7 +700,8 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
Version.CURRENT,
false,
executor,
null
null,
100000
);
context.evaluateRequestShouldUseConcurrentSearch();
if (executor == null) {
Expand Down
Loading

0 comments on commit 0c7d06c

Please sign in to comment.