Skip to content

Commit

Permalink
Misc speedups to query execution (elastic#112836)
Browse files Browse the repository at this point in the history
* use array for compound listeners
* Fix capturing lambda for timestamp update
* misc cleanups
  • Loading branch information
original-brownbear authored Sep 13, 2024
1 parent c2bd0de commit 7f83eab
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -738,14 +738,15 @@ public SearchExecutionContext newSearchExecutionContext(
clusterService,
expressionResolver
);
var mapperService = mapperService();
return new SearchExecutionContext(
shardId,
shardRequestIndex,
indexSettings,
indexCache.bitsetFilterCache(),
this::loadFielddata,
mapperService(),
mapperService().mappingLookup(),
mapperService,
mapperService.mappingLookup(),
similarityService(),
scriptService,
parserConfiguration,
Expand Down Expand Up @@ -781,7 +782,7 @@ public QueryRewriteContext newQueryRewriteContext(
expressionResolver
);
final MapperService mapperService = mapperService();
final MappingLookup mappingLookup = mapperService().mappingLookup();
final MappingLookup mappingLookup = mapperService.mappingLookup();
return new QueryRewriteContext(
parserConfiguration,
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3982,12 +3982,11 @@ && isSearchIdle()
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
setRefreshPending(engine);
l.onResponse(false);
return;
} else {
logger.trace("scheduledRefresh: refresh with source [schedule]");
engine.maybeRefresh("schedule", l.map(Engine.RefreshResult::refreshed));
return;
}
return;
}
logger.trace("scheduledRefresh: no refresh needed");
engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ default void validateReaderContext(ReaderContext readerContext, TransportRequest
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
final class CompositeListener implements SearchOperationListener {
private final List<SearchOperationListener> listeners;
private final SearchOperationListener[] listeners;
private final Logger logger;

CompositeListener(List<SearchOperationListener> listeners, Logger logger) {
this.listeners = listeners;
this.listeners = listeners.toArray(new SearchOperationListener[0]);
this.logger = logger;
}

Expand Down
50 changes: 19 additions & 31 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,19 +462,22 @@ public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings
}

protected void putReaderContext(ReaderContext context) {
final ReaderContext previous = activeReaders.put(context.id().getId(), context);
final long id = context.id().getId();
final ReaderContext previous = activeReaders.put(id, context);
assert previous == null;
// ensure that if we race against afterIndexRemoved, we remove the context from the active list.
// this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout.
final Index index = context.indexShard().shardId().getIndex();
if (indicesService.hasIndex(index) == false) {
removeReaderContext(context.id().getId());
removeReaderContext(id);
throw new IndexNotFoundException(index);
}
}

protected ReaderContext removeReaderContext(long id) {
logger.trace("removing reader context [{}]", id);
if (logger.isTraceEnabled()) {
logger.trace("removing reader context [{}]", id);
}
return activeReaders.remove(id);
}

Expand Down Expand Up @@ -651,12 +654,10 @@ private void searchReady() {

private IndexShard getShard(ShardSearchRequest request) {
final ShardSearchContextId contextId = request.readerId();
if (contextId != null) {
if (sessionId.equals(contextId.getSessionId())) {
final ReaderContext readerContext = activeReaders.get(contextId.getId());
if (readerContext != null) {
return readerContext.indexShard();
}
if (contextId != null && sessionId.equals(contextId.getSessionId())) {
final ReaderContext readerContext = activeReaders.get(contextId.getId());
if (readerContext != null) {
return readerContext.indexShard();
}
}
return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
Expand Down Expand Up @@ -970,13 +971,11 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
}
return createAndPutReaderContext(request, indexService, shard, searcherSupplier, defaultKeepAlive);
}
} else {
final long keepAliveInMillis = getKeepAlive(request);
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());
final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();
return createAndPutReaderContext(request, indexService, shard, searcherSupplier, keepAliveInMillis);
}
final long keepAliveInMillis = getKeepAlive(request);
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());
return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis);
}

final ReaderContext createAndPutReaderContext(
Expand All @@ -989,19 +988,15 @@ final ReaderContext createAndPutReaderContext(
ReaderContext readerContext = null;
Releasable decreaseScrollContexts = null;
try {
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
if (request.scroll() != null) {
decreaseScrollContexts = openScrollContexts::decrementAndGet;
if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) {
throw new TooManyScrollContextsException(maxOpenScrollContext, MAX_OPEN_SCROLL_CONTEXT.getKey());
}
}
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
if (request.scroll() != null) {
readerContext = new LegacyReaderContext(id, indexService, shard, reader, request, keepAliveInMillis);
if (request.scroll() != null) {
readerContext.addOnClose(decreaseScrollContexts);
decreaseScrollContexts = null;
}
readerContext.addOnClose(decreaseScrollContexts);
decreaseScrollContexts = null;
} else {
readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, true);
}
Expand All @@ -1011,16 +1006,9 @@ final ReaderContext createAndPutReaderContext(
searchOperationListener.onNewReaderContext(finalReaderContext);
if (finalReaderContext.scrollContext() != null) {
searchOperationListener.onNewScrollContext(finalReaderContext);
readerContext.addOnClose(() -> searchOperationListener.onFreeScrollContext(finalReaderContext));
}
readerContext.addOnClose(() -> {
try {
if (finalReaderContext.scrollContext() != null) {
searchOperationListener.onFreeScrollContext(finalReaderContext);
}
} finally {
searchOperationListener.onFreeReaderContext(finalReaderContext);
}
});
readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext));
putReaderContext(finalReaderContext);
readerContext = null;
return finalReaderContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public Releasable markAsUsed(long keepAliveInMillis) {
refCounted.incRef();
tryUpdateKeepAlive(keepAliveInMillis);
return Releasables.releaseOnce(() -> {
this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis()));
this.lastAccessTime.accumulateAndGet(nowInMillis(), Math::max);
refCounted.decRef();
});
}
Expand Down

0 comments on commit 7f83eab

Please sign in to comment.