Skip to content

Commit

Permalink
Introduce new setting search.concurrent.max_slice to control the slic…
Browse files Browse the repository at this point in the history
…e computation for concurrent segment search (#8884)

* Introduce new setting search.concurrent.max_slice to control the slice computation for concurrent
segment search. It uses lucene default mechanism if the setting value is <=0 otherwise uses custom max target slice mechanism

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Address review comments

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Address review comments

Signed-off-by: Sorabh Hamirwasia <[email protected]>

---------

Signed-off-by: Sorabh Hamirwasia <[email protected]>
  • Loading branch information
sohami authored Jul 28, 2023
1 parent 044ea25 commit ea83f8e
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
- Introduce new static cluster setting to control slice computation for concurrent segment search. ([#8847](https://github.com/opensearch-project/OpenSearch/pull/8884))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.search.SearchBootstrapSettings;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
Expand Down Expand Up @@ -677,7 +678,10 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
),
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING),
List.of(
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING)
);
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.plugins.ExtensionAwarePlugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.SearchBootstrapSettings;
import org.opensearch.telemetry.tracing.NoopTracerFactory;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.TracerFactory;
Expand Down Expand Up @@ -468,6 +469,7 @@ protected Node(

// Ensure to initialize Feature Flags via the settings from opensearch.yml
FeatureFlags.initializeFeatureFlags(settings);
SearchBootstrapSettings.initialize(settings);

final List<IdentityPlugin> identityPlugins = new ArrayList<>();
if (FeatureFlags.isEnabled(FeatureFlags.IDENTITY)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Keeps track of all the search related node level settings which can be accessed via static methods
*
* @opensearch.internal
*/
public class SearchBootstrapSettings {
// settings to configure maximum slice created per search request using OS custom slice computation mechanism. Default lucene
// mechanism will not be used if this setting is set with value > 0
public static final String CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY = "search.concurrent.max_slice";
public static final int CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE = 0;

// value == 0 means lucene slice computation will be used
// this setting will be updated to dynamic setting as part of https://github.com/opensearch-project/OpenSearch/issues/8870
public static final Setting<Integer> CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING = Setting.intSetting(
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY,
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE,
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE,
Setting.Property.NodeScope
);
private static Settings settings;

public static void initialize(Settings openSearchSettings) {
settings = openSearchSettings;
}

public static int getTargetMaxSlice() {
return (settings != null)
? settings.getAsInt(
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY,
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE
)
: CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.internal;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.lease.Releasable;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.SearchBootstrapSettings;
import org.opensearch.search.SearchService;
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.profile.ContextualProfileBreakdown;
Expand Down Expand Up @@ -93,11 +96,13 @@
* @opensearch.internal
*/
public class ContextIndexSearcher extends IndexSearcher implements Releasable {

private static final Logger logger = LogManager.getLogger(ContextIndexSearcher.class);
/**
* The interval at which we check for search cancellation when we cannot use
* a {@link CancellableBulkScorer}. See {@link #intersectScorerAndBitSet}.
*/
private static int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11;
private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11;

private AggregatedDfs aggregatedDfs;
private QueryProfiler profiler;
Expand Down Expand Up @@ -439,6 +444,18 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
return collectionStatistics;
}

/**
* Compute the leaf slices that will be used by concurrent segment search to spread work across threads
* @param leaves all the segments
* @return leafSlice group to be executed by different threads
*/
@Override
public LeafSlice[] slices(List<LeafReaderContext> leaves) {
// For now using the static setting to get the targetMaxSlice value. It will be updated to dynamic mechanism as part of
// https://github.com/opensearch-project/OpenSearch/issues/8870 when lucene changes are available
return slicesInternal(leaves, SearchBootstrapSettings.getTargetMaxSlice());
}

public DirectoryReader getDirectoryReader() {
final IndexReader reader = getIndexReader();
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
Expand Down Expand Up @@ -518,4 +535,19 @@ private boolean shouldReverseLeafReaderContexts() {
}
return false;
}

// package-private for testing
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
LeafSlice[] leafSlices;
if (targetMaxSlice == 0) {
// use the default lucene slice calculation
leafSlices = super.slices(leaves);
logger.debug("Slice count using lucene default [{}]", leafSlices.length);
} else {
// use the custom slice calculation based on targetMaxSlice
leafSlices = MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice);
logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length);
}
return leafSlices;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.internal;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.IndexSearcher;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
* Supplier to compute leaf slices based on passed in leaves and max target slice count to limit the number of computed slices. It sorts
* all the leaves based on document count and then assign each leaf in round-robin fashion to the target slice count slices. Based on
* experiment results as shared in <a href=https://github.com/opensearch-project/OpenSearch/issues/7358>issue-7358</a>
* we can see this mechanism helps to achieve better tail/median latency over default lucene slice computation.
*
* @opensearch.internal
*/
final class MaxTargetSliceSupplier {

static IndexSearcher.LeafSlice[] getSlices(List<LeafReaderContext> leaves, int targetMaxSlice) {
if (targetMaxSlice <= 0) {
throw new IllegalArgumentException("MaxTargetSliceSupplier called with unexpected slice count of " + targetMaxSlice);
}

// slice count should not exceed the segment count
int targetSliceCount = Math.min(targetMaxSlice, leaves.size());

// Make a copy so we can sort:
List<LeafReaderContext> sortedLeaves = new ArrayList<>(leaves);

// Sort by maxDoc, descending:
sortedLeaves.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc())));

final List<List<LeafReaderContext>> groupedLeaves = new ArrayList<>();
for (int i = 0; i < targetSliceCount; ++i) {
groupedLeaves.add(new ArrayList<>());
}
// distribute the slices in round-robin fashion
for (int idx = 0; idx < sortedLeaves.size(); ++idx) {
int currentGroup = idx % targetSliceCount;
groupedLeaves.get(currentGroup).add(sortedLeaves.get(idx));
}

return groupedLeaves.stream().map(IndexSearcher.LeafSlice::new).toArray(IndexSearcher.LeafSlice[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.hamcrest.Matchers;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.search.SearchBootstrapSettings;
import org.opensearch.search.SearchService;
import org.opensearch.test.FeatureFlagSetter;

Expand Down Expand Up @@ -335,4 +336,36 @@ public void testConcurrentSegmentSearchIndexSettings() {
"node"
);
}

public void testMaxSliceCountClusterSettingsForConcurrentSearch() {
// Test that we throw an exception without the feature flag
Settings settings = Settings.builder()
.put(SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), true)
.build();
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new SettingsModule(settings));
assertTrue(
ex.getMessage()
.contains("unknown setting [" + SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey())
);

// Test that the settings updates correctly with the feature flag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
int settingValue = randomIntBetween(0, 10);
Settings settingsWithFeatureFlag = Settings.builder()
.put(SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue)
.build();
SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag);
assertEquals(
settingValue,
(int) SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.get(settingsModule.getSettings())
);

// Test that negative value is not allowed
settingValue = -1;
final Settings settingsWithFeatureFlag_2 = Settings.builder()
.put(SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue)
.build();
ex = expectThrows(IllegalArgumentException.class, () -> new SettingsModule(settingsWithFeatureFlag_2));
assertTrue(ex.getMessage().contains(SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.search.SearchBootstrapSettings;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.IndexSettingsModule;
Expand All @@ -90,6 +91,7 @@
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;

import static org.mockito.Mockito.mock;
Expand All @@ -100,6 +102,7 @@
import static org.opensearch.search.internal.ExitableDirectoryReader.ExitableTerms;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.opensearch.search.internal.IndexReaderUtils.getLeaves;

public class ContextIndexSearcherTests extends OpenSearchTestCase {
public void testIntersectScorerAndRoleBits() throws Exception {
Expand Down Expand Up @@ -304,6 +307,59 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
IOUtils.close(reader, w, dir);
}

public void testSlicesInternal() throws Exception {
final List<LeafReaderContext> leaves = getLeaves(10);

final Directory directory = newDirectory();
IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE));
Document document = new Document();
document.add(new StringField("field1", "value1", Field.Store.NO));
document.add(new StringField("field2", "value1", Field.Store.NO));
iw.addDocument(document);
iw.commit();
DirectoryReader directoryReader = DirectoryReader.open(directory);

SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
ContextIndexSearcher searcher = new ContextIndexSearcher(
directoryReader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null,
searchContext
);
// Case 1: Verify the slice count when lucene default slice computation is used
IndexSearcher.LeafSlice[] slices = searcher.slicesInternal(
leaves,
SearchBootstrapSettings.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE
);
int expectedSliceCount = 2;
// 2 slices will be created since max segment per slice of 5 will be reached
assertEquals(expectedSliceCount, slices.length);
for (int i = 0; i < expectedSliceCount; ++i) {
assertEquals(5, slices[i].leaves.length);
}

// Case 2: Verify the slice count when custom max slice computation is used
expectedSliceCount = 4;
slices = searcher.slicesInternal(leaves, expectedSliceCount);

// 4 slices will be created with 3 leaves in first 2 slices and 2 leaves in other slices
assertEquals(expectedSliceCount, slices.length);
for (int i = 0; i < expectedSliceCount; ++i) {
if (i < 2) {
assertEquals(3, slices[i].leaves.length);
} else {
assertEquals(2, slices[i].leaves.length);
}
}
IOUtils.close(directoryReader, iw, directory);
}

private SparseFixedBitSet query(LeafReaderContext leaf, String field, String value) throws IOException {
SparseFixedBitSet sparseFixedBitSet = new SparseFixedBitSet(leaf.reader().maxDoc());
TermsEnum tenum = leaf.reader().terms(field).iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.internal;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;

import java.util.List;

import static org.apache.lucene.tests.util.LuceneTestCase.newDirectory;

public class IndexReaderUtils {

/**
* Utility to create leafCount number of {@link LeafReaderContext}
* @param leafCount count of leaves to create
* @return created leaves
*/
public static List<LeafReaderContext> getLeaves(int leafCount) throws Exception {
final Directory directory = newDirectory();
IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE));
for (int i = 0; i < leafCount; ++i) {
Document document = new Document();
final String fieldValue = "value" + i;
document.add(new StringField("field1", fieldValue, Field.Store.NO));
document.add(new StringField("field2", fieldValue, Field.Store.NO));
iw.addDocument(document);
iw.commit();
}
iw.close();
DirectoryReader directoryReader = DirectoryReader.open(directory);
List<LeafReaderContext> leaves = directoryReader.leaves();
directoryReader.close();
directory.close();
return leaves;
}
}
Loading

0 comments on commit ea83f8e

Please sign in to comment.