Skip to content

Commit

Permalink
Fix potential leaks in search execution (elastic#108391)
Browse files Browse the repository at this point in the history
Cleaning up some potentially leaky spots or at the very least making
them easier to read.
  • Loading branch information
original-brownbear authored May 10, 2024
1 parent 79032ec commit 8d19849
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,8 @@ public SearchPhase newSearchPhase(
task,
true,
searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis),
listener.delegateFailureAndWrap((l, iters) -> {
SearchPhase action = newSearchPhase(
listener.delegateFailureAndWrap(
(l, iters) -> newSearchPhase(
task,
searchRequest,
executor,
Expand All @@ -1317,30 +1317,32 @@ public SearchPhase newSearchPhase(
false,
threadPool,
clusters
);
action.start();
})
);
} else {
// for synchronous CCS minimize_roundtrips=false, use the CCSSingleCoordinatorSearchProgressListener
// (AsyncSearchTask will not return SearchProgressListener.NOOP, since it uses its own progress listener
// which delegates to CCSSingleCoordinatorSearchProgressListener when minimizing roundtrips)
if (clusters.isCcsMinimizeRoundtrips() == false
&& clusters.hasRemoteClusters()
&& task.getProgressListener() == SearchProgressListener.NOOP) {
task.setProgressListener(new CCSSingleCoordinatorSearchProgressListener());
}
final SearchPhaseResults<SearchPhaseResult> queryResultConsumer = searchPhaseController.newSearchPhaseResults(
executor,
circuitBreaker,
task::isCancelled,
task.getProgressListener(),
searchRequest,
shardIterators.size(),
exc -> searchTransportService.cancelSearchTask(task, "failed to merge result [" + exc.getMessage() + "]")
).start()
)
);
}
// for synchronous CCS minimize_roundtrips=false, use the CCSSingleCoordinatorSearchProgressListener
// (AsyncSearchTask will not return SearchProgressListener.NOOP, since it uses its own progress listener
// which delegates to CCSSingleCoordinatorSearchProgressListener when minimizing roundtrips)
if (clusters.isCcsMinimizeRoundtrips() == false
&& clusters.hasRemoteClusters()
&& task.getProgressListener() == SearchProgressListener.NOOP) {
task.setProgressListener(new CCSSingleCoordinatorSearchProgressListener());
}
final SearchPhaseResults<SearchPhaseResult> queryResultConsumer = searchPhaseController.newSearchPhaseResults(
executor,
circuitBreaker,
task::isCancelled,
task.getProgressListener(),
searchRequest,
shardIterators.size(),
exc -> searchTransportService.cancelSearchTask(task, "failed to merge result [" + exc.getMessage() + "]")
);
boolean success = false;
try {
final SearchPhase searchPhase;
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
return new SearchDfsQueryThenFetchAsyncAction(
searchPhase = new SearchDfsQueryThenFetchAsyncAction(
logger,
namedWriteableRegistry,
searchTransportService,
Expand All @@ -1359,7 +1361,7 @@ public SearchPhase newSearchPhase(
);
} else {
assert searchRequest.searchType() == QUERY_THEN_FETCH : searchRequest.searchType();
return new SearchQueryThenFetchAsyncAction(
searchPhase = new SearchQueryThenFetchAsyncAction(
logger,
namedWriteableRegistry,
searchTransportService,
Expand All @@ -1377,6 +1379,12 @@ public SearchPhase newSearchPhase(
clusters
);
}
success = true;
return searchPhase;
} finally {
if (success == false) {
queryResultConsumer.close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ public Query rewrittenQuery() {
* Adds a releasable that will be freed when this context is closed.
*/
public void addReleasable(Releasable releasable) { // TODO most Releasables are managed by their callers. We probably don't need this.
assert closed.get() == false;
releasables.add(releasable);
}

Expand Down

0 comments on commit 8d19849

Please sign in to comment.