Skip to content

Commit

Permalink
Make sure listener runs before plugin listeners
Browse files Browse the repository at this point in the history
Signed-off-by: Siddhant Deshmukh <[email protected]>
  • Loading branch information
deshsidd committed Jul 23, 2024
1 parent 90ac7e5 commit 974ee3e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
* upon request completion.
*
*/
final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener {
public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener {
private final TaskResourceTrackingService taskResourceTrackingService;

SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) {
public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) {
this.taskResourceTrackingService = taskResourceTrackingService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final SearchPipelineService searchPipelineService;
private final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory;
private final Tracer tracer;
private final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener;

private final MetricsRegistry metricsRegistry;

Expand Down Expand Up @@ -216,7 +215,6 @@ public TransportSearchAction(
this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory;
this.tracer = tracer;
this.taskResourceTrackingService = taskResourceTrackingService;
this.searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(taskResourceTrackingService);
}

private Map<String, AliasFilter> buildPerIndexAliasFilter(
Expand Down Expand Up @@ -435,8 +433,7 @@ private void executeRequest(
requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener(
originalSearchRequest,
logger,
TraceableSearchRequestOperationsListener.create(tracer, requestSpan),
searchTaskRequestOperationsListener
TraceableSearchRequestOperationsListener.create(tracer, requestSpan)
);
SearchRequestContext searchRequestContext = new SearchRequestContext(
requestOperationsListeners,
Expand Down
16 changes: 9 additions & 7 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.SearchTaskRequestOperationsListener;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.support.TransportAction;
import org.opensearch.action.update.UpdateHelper;
Expand Down Expand Up @@ -855,8 +856,15 @@ protected Node(
threadPool
);

final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings());
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);
final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(taskResourceTrackingService);

remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings);
Expand Down Expand Up @@ -988,7 +996,7 @@ protected Node(
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
new SearchRequestOperationsCompositeListenerFactory(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog),
Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
Expand Down Expand Up @@ -1117,12 +1125,6 @@ protected Node(
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
clusterService.setIndexingPressureService(indexingPressureService);

final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
settings,
clusterService.getClusterSettings()
Expand Down

0 comments on commit 974ee3e

Please sign in to comment.