Skip to content

Commit

Permalink
Disable concurrent segment search for system indices and throttled se…
Browse files Browse the repository at this point in the history
…arch requests (opensearch-project#12954)

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Mar 28, 2024
1 parent 48881de commit cb2b7a4
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,16 +962,21 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
* false: otherwise
*/
private boolean evaluateConcurrentSegmentSearchSettings(Executor concurrentSearchExecutor) {
// Do not use concurrent segment search for system indices or throttled requests. See:
// https://github.com/opensearch-project/OpenSearch/issues/12951
if (indexShard.isSystem() || indexShard.indexSettings().isSearchThrottled()) {
return false;
}

if ((clusterService != null) && (concurrentSearchExecutor != null)) {
return indexService.getIndexSettings()
.getSettings()
.getAsBoolean(
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
} else {
return false;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.opensearch.index.IndexSettings.INDEX_SEARCH_THROTTLED;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -168,6 +169,7 @@ public void testPreProcess() throws Exception {
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
when(indexService.getIndexSettings()).thenReturn(indexSettings);
when(mapperService.getIndexSettings()).thenReturn(indexSettings);
when(indexShard.indexSettings()).thenReturn(indexSettings);

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());

Expand Down Expand Up @@ -486,6 +488,14 @@ public void testClearQueryCancellationsOnClose() throws IOException {
when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn(
queryShardContext
);
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.build();
IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
when(indexShard.indexSettings()).thenReturn(indexSettings);

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());

Expand Down Expand Up @@ -551,7 +561,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
}
}

public void testSearchPathEvaluationUsingSortField() throws Exception {
public void testSearchPathEvaluation() throws Exception {
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1);
Expand All @@ -578,9 +588,24 @@ public void testSearchPathEvaluationUsingSortField() throws Exception {
IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
when(indexService.getIndexSettings()).thenReturn(indexSettings);
when(indexShard.indexSettings()).thenReturn(indexSettings);

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());

IndexShard systemIndexShard = mock(IndexShard.class);
when(systemIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
when(systemIndexShard.getThreadPool()).thenReturn(threadPool);
when(systemIndexShard.isSystem()).thenReturn(true);

IndexShard throttledIndexShard = mock(IndexShard.class);
when(throttledIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
when(throttledIndexShard.getThreadPool()).thenReturn(threadPool);
IndexSettings throttledIndexSettings = new IndexSettings(
indexMetadata,
Settings.builder().put(INDEX_SEARCH_THROTTLED.getKey(), true).build()
);
when(throttledIndexShard.indexSettings()).thenReturn(throttledIndexSettings);

try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {

final Supplier<Engine.SearcherSupplier> searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) {
Expand Down Expand Up @@ -697,6 +722,62 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
}
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case 4: With a system index concurrent segment search is not used
readerContext = new ReaderContext(
newContextId(),
indexService,
systemIndexShard,
searcherSupplier.get(),
randomNonNegativeLong(),
false
);
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
null,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
assertFalse(context.shouldUseConcurrentSearch());
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case 5: When search is throttled concurrent segment search is not used
readerContext = new ReaderContext(
newContextId(),
indexService,
throttledIndexShard,
searcherSupplier.get(),
randomNonNegativeLong(),
false
);
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
null,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
assertFalse(context.shouldUseConcurrentSearch());
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// shutdown the threadpool
threadPool.shutdown();
}
Expand Down

0 comments on commit cb2b7a4

Please sign in to comment.