diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index fb3c49d83cb93..52d4542faaf77 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -24,9 +24,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -60,6 +63,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.function.BiFunction; import static org.elasticsearch.action.search.SearchTransportAPMMetrics.ACTION_ATTRIBUTE_NAME; @@ -455,9 +459,10 @@ public static void registerRequestHandler( boolean freed = searchService.freeReaderContext(request.id()); channel.sendResponse(SearchFreeContextResponse.of(freed)); }; + final Executor freeContextExecutor = buildFreeContextExecutor(transportService); transportService.registerRequestHandler( FREE_CONTEXT_SCROLL_ACTION_NAME, - transportService.getThreadPool().generic(), + freeContextExecutor, ScrollFreeContextRequest::new, instrumentedHandler(FREE_CONTEXT_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler) ); @@ -470,7 +475,7 @@ public static void registerRequestHandler( transportService.registerRequestHandler( FREE_CONTEXT_ACTION_NAME, - transportService.getThreadPool().generic(), + freeContextExecutor, SearchFreeContextRequest::new, instrumentedHandler(FREE_CONTEXT_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler) ); @@ -478,7 +483,7 @@ public static void registerRequestHandler( transportService.registerRequestHandler( CLEAR_SCROLL_CONTEXTS_ACTION_NAME, - transportService.getThreadPool().generic(), + freeContextExecutor, ClearScrollContextsRequest::new, instrumentedHandler(CLEAR_SCROLL_CONTEXTS_ACTION_METRIC, transportService, searchTransportMetrics, (request, channel, task) -> { searchService.freeAllScrollContexts(); @@ -626,6 +631,32 @@ public static void registerRequestHandler( TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new); } + private static Executor buildFreeContextExecutor(TransportService transportService) { + final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner( + "free_context", + 1, + transportService.getThreadPool().generic() + ); + return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + r.run(); + } + } + + @Override + public void onFailure(Exception e) { + if (r instanceof AbstractRunnable abstractRunnable) { + abstractRunnable.onFailure(e); + } + // should be impossible, GENERIC pool doesn't reject anything + logger.error("unexpected failure running " + r, e); + assert false : new AssertionError("unexpected failure running " + r, e); + } + }); + } + private static TransportRequestHandler instrumentedHandler( String actionQualifier, TransportService transportService,