Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move parallel fetch config into index live settings #740

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions clientlib/src/main/proto/yelp/nrtsearch/luceneserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,10 @@ message IndexLiveSettings {
google.protobuf.UInt64Value maxMergePreCopyDurationSec = 14;
// Collect and publish additional index metrics, which may be more expensive in terms of volume, memory and/or compute, default: false
google.protobuf.BoolValue verboseMetrics = 15;
// If fetch parallelism should be done by groups of fields instead of document, default: false
google.protobuf.BoolValue parallelFetchByField = 16;
// The number of documents/fields per parallel fetch task, default: 50
google.protobuf.Int32Value parallelFetchChunkSize = 17;
}

message IndexStateInfo {
Expand Down
19 changes: 18 additions & 1 deletion docs/index_live_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,21 @@ Specifies the maximum time to wait for replicas to precopy merged segment files.

Must be >= 0

Default: 0
Default: 0

parallelFetchByField
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When using parallelism to fetch field values, this setting determines if the work should be divided by fields or by documents.

Default: false (divide by documents)


parallelFetchChunkSize
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When using parallelism to fetch field values, this setting determines the maximum number of fields/documents to process in a single task.

Must be > 0

Default: 50
1,061 changes: 546 additions & 515 deletions grpc-gateway/luceneserver.pb.go

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions grpc-gateway/luceneserver.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,21 @@
"required": false,
"type": "boolean"
},
{
"name": "liveSettings.parallelFetchByField",
"description": "If fetch parallelism should be done by groups of fields instead of document, default: false",
"in": "query",
"required": false,
"type": "boolean"
},
{
"name": "liveSettings.parallelFetchChunkSize",
"description": "The number of documents/fields per parallel fetch task, default: 50",
"in": "query",
"required": false,
"type": "integer",
"format": "int32"
},
{
"name": "local",
"description": "When set to true, live settings changes are only applied to the local node. These changes are ephemeral, so will not persist through a restart. Also, the live settings returned in the response will contain the local overrides only when this flag is true.",
Expand Down Expand Up @@ -3531,6 +3546,15 @@
"verboseMetrics": {
"type": "boolean",
"title": "Collect and publish additional index metrics, which may be more expensive in terms of volume, memory and/or compute, default: false"
},
"parallelFetchByField": {
"type": "boolean",
"title": "If fetch parallelism should be done by groups of fields instead of document, default: false"
},
"parallelFetchChunkSize": {
"type": "integer",
"format": "int32",
"title": "The number of documents/fields per parallel fetch task, default: 50"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public class ThreadPoolConfiguration {

public static final int DEFAULT_FETCH_THREADS = 1;
public static final int DEFAULT_FETCH_BUFFERED_ITEMS = DEFAULT_SEARCH_BUFFERED_ITEMS;
public static final int DEFAULT_MIN_PARALLEL_FETCH_NUM_FIELDS = 20;
public static final int DEFAULT_MIN_PARALLEL_FETCH_NUM_HITS = 50;

public static final int DEFAULT_GRPC_THREADS = AVAILABLE_PROCESSORS * 2;
public static final int DEFAULT_GRPC_BUFFERED_ITEMS = 8;
Expand All @@ -57,10 +55,6 @@ public class ThreadPoolConfiguration {
public static final int DEFAULT_VECTOR_MERGE_BUFFERED_ITEMS =
Math.max(100, 2 * DEFAULT_VECTOR_MERGE_THREADS);

private final int minParallelFetchNumFields;
private final int minParallelFetchNumHits;
private final boolean parallelFetchByField;

/**
* Settings for a {@link com.yelp.nrtsearch.server.utils.ThreadPoolExecutorFactory.ExecutorType}.
*
Expand Down Expand Up @@ -126,17 +120,6 @@ public ThreadPoolConfiguration(YamlConfigReader configReader) {
threadPoolSettings.put(
executorType, new ThreadPoolSettings(maxThreads, maxBufferedItems, threadNamePrefix));
}

// TODO: Move these setting somewhere else. They might be better as index live settings.
minParallelFetchNumFields =
configReader.getInteger(
"threadPoolConfiguration.minParallelFetchNumFields",
DEFAULT_MIN_PARALLEL_FETCH_NUM_FIELDS);
minParallelFetchNumHits =
configReader.getInteger(
"threadPoolConfiguration.minParallelFetchNumHits", DEFAULT_MIN_PARALLEL_FETCH_NUM_HITS);
parallelFetchByField =
configReader.getBoolean("threadPoolConfiguration.parallelFetchByField", true);
}

@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down Expand Up @@ -196,16 +179,4 @@ public ThreadPoolSettings getThreadPoolSettings(
ThreadPoolExecutorFactory.ExecutorType executorType) {
return threadPoolSettings.get(executorType);
}

public int getMinParallelFetchNumFields() {
return minParallelFetchNumFields;
}

public int getMinParallelFetchNumHits() {
return minParallelFetchNumHits;
}

public boolean getParallelFetchByField() {
return parallelFetchByField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public abstract class IndexState implements Closeable {

private static final Pattern reSimpleName = Pattern.compile("^[a-zA-Z_][a-zA-Z_0-9]*$");
private final ThreadPoolExecutor searchThreadPoolExecutor;
private final ExecutorService fetchThreadPoolExecutor;
private Warmer warmer = null;

/** The meta field definitions */
Expand All @@ -97,6 +96,20 @@ public abstract class IndexState implements Closeable {
*/
public final DocLookup docLookup = new DocLookup(this, this::getField);

/**
* Holds the configuration for parallel fetch operations.
*
* @param maxParallelism maximum number of parallel fetch operations
* @param parallelFetchByField if true, fetches are parallelized by field instead of by document
* @param parallelFetchChunkSize number of documents/fields in each parallel fetch operation
* @param fetchExecutor executor service for parallel fetch operations
*/
public record ParallelFetchConfig(
int maxParallelism,
boolean parallelFetchByField,
int parallelFetchChunkSize,
ExecutorService fetchExecutor) {}

/** Search-time analyzer. */
public final Analyzer searchAnalyzer =
new AnalyzerWrapper(Analyzer.PER_FIELD_REUSE_STRATEGY) {
Expand Down Expand Up @@ -223,7 +236,6 @@ public IndexState(GlobalState globalState, String name, Path rootDir) throws IOE
}

searchThreadPoolExecutor = globalState.getSearchThreadPoolExecutor();
fetchThreadPoolExecutor = globalState.getFetchService();
}

/** Get index name. */
Expand Down Expand Up @@ -274,11 +286,6 @@ public ThreadPoolExecutor getSearchThreadPoolExecutor() {
return searchThreadPoolExecutor;
}

/** Get thread pool to use for parallel fetch operations. */
public ExecutorService getFetchThreadPoolExecutor() {
return fetchThreadPoolExecutor;
}

public ThreadPoolConfiguration getThreadPoolConfiguration() {
return globalState.getThreadPoolConfiguration();
}
Expand Down Expand Up @@ -374,6 +381,13 @@ public abstract void start(

public abstract FacetsConfig getFacetsConfig();

/**
* Get configuration for parallel fetch for this index.
*
* @return configuration for parallel fetch
*/
public abstract ParallelFetchConfig getParallelFetchConfig();

/**
* Get shard state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.yelp.nrtsearch.server.luceneserver.search.SearcherResult;
import com.yelp.nrtsearch.server.monitoring.SearchResponseCollector;
import com.yelp.nrtsearch.server.utils.ObjectToCompositeFieldTransformer;
import com.yelp.nrtsearch.server.utils.ThreadPoolExecutorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -323,23 +322,13 @@ private void fetchFields(SearchContext searchContext)
new ArrayList<>(searchContext.getResponseBuilder().getHitsBuilderList());
hitBuilders.sort(Comparator.comparing(Hit.Builder::getLuceneDocId));

IndexState indexState = searchContext.getIndexState();
int fetchThreadPoolSize =
indexState
.getThreadPoolConfiguration()
.getThreadPoolSettings(ThreadPoolExecutorFactory.ExecutorType.FETCH)
.maxThreads();
int minParallelFetchNumFields =
indexState.getThreadPoolConfiguration().getMinParallelFetchNumFields();
int minParallelFetchNumHits =
indexState.getThreadPoolConfiguration().getMinParallelFetchNumHits();
boolean parallelFetchByField =
indexState.getThreadPoolConfiguration().getParallelFetchByField();

if (parallelFetchByField
&& fetchThreadPoolSize > 1
&& searchContext.getRetrieveFields().keySet().size() > minParallelFetchNumFields
&& hitBuilders.size() > minParallelFetchNumHits) {
IndexState.ParallelFetchConfig parallelFetchConfig =
searchContext.getIndexState().getParallelFetchConfig();

if (parallelFetchConfig.parallelFetchByField()
&& parallelFetchConfig.maxParallelism() > 1
&& searchContext.getRetrieveFields().keySet().size()
> parallelFetchConfig.parallelFetchChunkSize()) {
// Fetch fields in parallel

List<LeafReaderContext> leaves =
Expand All @@ -353,12 +342,13 @@ private void fetchFields(SearchContext searchContext)
}
List<String> fields = new ArrayList<>(searchContext.getRetrieveFields().keySet());

// parallelism is min of fetchThreadPoolSize and fields.size() / MIN_PARALLEL_NUM_FIELDS
// parallelism is min of maxParallelism and fields.size() / parallelFetchChunkSize
// round up
int parallelism =
Math.min(
fetchThreadPoolSize,
(fields.size() + minParallelFetchNumFields - 1) / minParallelFetchNumFields);
parallelFetchConfig.maxParallelism(),
(fields.size() + parallelFetchConfig.parallelFetchChunkSize() - 1)
/ parallelFetchConfig.parallelFetchChunkSize());
List<List<String>> fieldsChunks =
Lists.partition(fields, (fields.size() + parallelism - 1) / parallelism);
List<Future<List<Map<String, CompositeFieldValue>>>> futures = new ArrayList<>();
Expand All @@ -368,11 +358,10 @@ private void fetchFields(SearchContext searchContext)
// Stored fields are not widely used for NRTSearch (not recommended for memory usage)
for (List<String> fieldsChunk : fieldsChunks) {
futures.add(
indexState
.getFetchThreadPoolExecutor()
parallelFetchConfig
.fetchExecutor()
.submit(
new FillFieldsTask(
indexState,
searchContext.getSearcherAndTaxonomy().searcher,
hitIdToLeaves,
hitBuilders,
Expand Down Expand Up @@ -401,26 +390,27 @@ private void fetchFields(SearchContext searchContext)
}
searchContext.getFetchTasks().processHit(searchContext, leaf, hitResponse);
}
} else if (!parallelFetchByField
&& fetchThreadPoolSize > 1
&& hitBuilders.size() > minParallelFetchNumHits) {
} else if (!parallelFetchConfig.parallelFetchByField()
&& parallelFetchConfig.maxParallelism() > 1
&& hitBuilders.size() > parallelFetchConfig.parallelFetchChunkSize()) {
// Fetch docs in parallel

// parallelism is min of fetchThreadPoolSize and hitsBuilder.size() / MIN_PARALLEL_NUM_HITS
// parallelism is min of maxParallelism and hitsBuilder.size() / parallelFetchChunkSize
// round up
int parallelism =
Math.min(
fetchThreadPoolSize,
(hitBuilders.size() + minParallelFetchNumHits - 1) / minParallelFetchNumHits);
parallelFetchConfig.maxParallelism(),
(hitBuilders.size() + parallelFetchConfig.parallelFetchChunkSize() - 1)
/ parallelFetchConfig.parallelFetchChunkSize());
List<List<Hit.Builder>> docChunks =
Lists.partition(hitBuilders, (hitBuilders.size() + parallelism - 1) / parallelism);

// process each document chunk in parallel
List<Future<?>> futures = new ArrayList<>();
for (List<Hit.Builder> docChunk : docChunks) {
futures.add(
indexState
.getFetchThreadPoolExecutor()
parallelFetchConfig
.fetchExecutor()
.submit(new FillDocsTask(searchContext, docChunk, searchContext.getQuery())));
}
for (Future<?> future : futures) {
Expand Down Expand Up @@ -709,21 +699,18 @@ private static SearcherTaxonomyManager.SearcherAndTaxonomy openSnapshotReader(

public static class FillFieldsTask implements Callable<List<Map<String, CompositeFieldValue>>> {

private IndexState state;
private IndexSearcher s;
private List<LeafReaderContext> hitIdToleaves;
private List<Hit.Builder> hitBuilders;
private List<String> fields;
private SearchContext searchContext;

public FillFieldsTask(
IndexState indexState,
IndexSearcher indexSearcher,
List<LeafReaderContext> hitIdToleaves,
List<Hit.Builder> hitBuilders,
List<String> fields,
SearchContext searchContext) {
this.state = indexState;
this.s = indexSearcher;
this.fields = fields;
this.searchContext = searchContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.yelp.nrtsearch.server.luceneserver.nrt.NrtDataManager;
import com.yelp.nrtsearch.server.luceneserver.search.sort.SortParser;
import com.yelp.nrtsearch.server.remote.RemoteBackend;
import com.yelp.nrtsearch.server.utils.ThreadPoolExecutorFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class ImmutableIndexState extends IndexState {
public static final int DEFAULT_VIRTUAL_SHARDS = 1;
public static final int DEFAULT_SEGMENTS_PER_TIER = 10;
public static final int DEFAULT_MAX_MERGED_SEGMENT_MB = 5 * 1024;
public static final int DEFAULT_PARALLEL_FETCH_CHUNK_SIZE = 50;

// default live settings as message, so they can be merged with saved settings
public static final IndexLiveSettings DEFAULT_INDEX_LIVE_SETTINGS =
Expand All @@ -150,6 +152,9 @@ public class ImmutableIndexState extends IndexState {
.setDefaultTerminateAfter(Int32Value.newBuilder().setValue(0).build())
.setMaxMergePreCopyDurationSec(UInt64Value.newBuilder().setValue(0))
.setVerboseMetrics(BoolValue.newBuilder().setValue(false).build())
.setParallelFetchByField(BoolValue.newBuilder().setValue(false).build())
.setParallelFetchChunkSize(
Int32Value.newBuilder().setValue(DEFAULT_PARALLEL_FETCH_CHUNK_SIZE).build())
.build();

// Live Settings
Expand All @@ -168,6 +173,7 @@ public class ImmutableIndexState extends IndexState {
private final int defaultTerminateAfter;
private final long maxMergePreCopyDurationSec;
private final boolean verboseMetrics;
private final ParallelFetchConfig parallelFetchConfig;

private final IndexStateManager indexStateManager;
private final String uniqueName;
Expand Down Expand Up @@ -261,6 +267,20 @@ public ImmutableIndexState(
maxMergePreCopyDurationSec =
mergedLiveSettingsWithLocal.getMaxMergePreCopyDurationSec().getValue();
verboseMetrics = mergedLiveSettingsWithLocal.getVerboseMetrics().getValue();
// Parallel fetch config
int maxParallelism =
globalState
.getThreadPoolConfiguration()
.getThreadPoolSettings(ThreadPoolExecutorFactory.ExecutorType.FETCH)
.maxThreads();
boolean parallelFetchByField = mergedLiveSettingsWithLocal.getParallelFetchByField().getValue();
int parallelFetchChunkSize = mergedLiveSettingsWithLocal.getParallelFetchChunkSize().getValue();
parallelFetchConfig =
new ParallelFetchConfig(
maxParallelism,
parallelFetchByField,
parallelFetchChunkSize,
globalState.getFetchService());

// If there is previous shard state, use it. Otherwise, initialize the shard.
if (previousShardState != null) {
Expand Down Expand Up @@ -494,6 +514,11 @@ public FacetsConfig getFacetsConfig() {
return fieldAndFacetState.getFacetsConfig();
}

@Override
public ParallelFetchConfig getParallelFetchConfig() {
return parallelFetchConfig;
}

@Override
public ShardState getShard(int shardOrd) {
ShardState shardState = shards.get(shardOrd);
Expand Down Expand Up @@ -799,6 +824,9 @@ static void validateLiveSettings(IndexLiveSettings liveSettings) {
if (liveSettings.getMaxMergePreCopyDurationSec().getValue() < 0) {
throw new IllegalArgumentException("maxMergePreCopyDurationSec must be >= 0");
}
if (liveSettings.getParallelFetchChunkSize().getValue() <= 0) {
throw new IllegalArgumentException("parallelFetchChunkSize must be > 0");
}
}

private static void validateIndexSort(Sort sort) {
Expand Down
Loading
Loading