From c2e0a645ebe2eeebc2e8b753f9aa884a8fa28040 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Fri, 12 Jul 2024 13:46:41 -0400 Subject: [PATCH] Initialize clients Signed-off-by: Craig Perkins --- .../main/java/org/opensearch/node/Node.java | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 37267f1d726fb..063d3741812c3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -931,30 +931,32 @@ protected Node( final ViewService viewService = new ViewService(clusterService, client, null); - Collection pluginComponents = pluginsService.filterPlugins(Plugin.class) - .stream() - .flatMap( - p -> p.createComponents( - new PluginAwareNodeClient(settings, threadPool, p), - clusterService, - threadPool, - resourceWatcherService, - scriptService, - xContentRegistry, - environment, - nodeEnvironment, - namedWriteableRegistry, - clusterModule.getIndexNameExpressionResolver(), - repositoriesServiceReference::get - ).stream() - ) - .collect(Collectors.toList()); + List pluginNodeClients = new ArrayList<>(); + Collection pluginComponents = pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> { + PluginAwareNodeClient pluginClient = new PluginAwareNodeClient(settings, threadPool, p); + pluginNodeClients.add(pluginClient); + return p.createComponents( + pluginClient, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + xContentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + clusterModule.getIndexNameExpressionResolver(), + repositoriesServiceReference::get + ).stream(); + }).collect(Collectors.toList()); Collection telemetryAwarePluginComponents = pluginsService.filterPlugins(TelemetryAwarePlugin.class) .stream() - .flatMap( - p -> p.createComponents( - new PluginAwareNodeClient(settings, threadPool, (Plugin) p), + .flatMap(p -> { + PluginAwareNodeClient pluginClient = new PluginAwareNodeClient(settings, threadPool, (Plugin) p); + pluginNodeClients.add(pluginClient); + return p.createComponents( + pluginClient, clusterService, threadPool, resourceWatcherService, @@ -967,8 +969,8 @@ protected Node( repositoriesServiceReference::get, tracer, metricsRegistry - ).stream() - ) + ).stream(); + }) .collect(Collectors.toList()); // Add the telemetryAwarePlugin components to the existing pluginComponents collection. @@ -1430,6 +1432,14 @@ protected Node( transportService.getRemoteClusterService(), namedWriteableRegistry ); + for (PluginAwareNodeClient pluginClient : pluginNodeClients) { + pluginClient.initialize( + dynamicActionRegistry, + () -> clusterService.localNode().getId(), + transportService.getRemoteClusterService(), + namedWriteableRegistry + ); + } this.namedWriteableRegistry = namedWriteableRegistry; logger.debug("initializing HTTP handlers ...");