Skip to content

Commit

Permalink
Start peeling off methods from the top of the NodeConstructor constru…
Browse files Browse the repository at this point in the history
…ct method (elastic#101119)

Create separate methods for initial object creation
  • Loading branch information
thecoop authored Nov 3, 2023
1 parent e851b30 commit 4fc58af
Showing 1 changed file with 71 additions and 43 deletions.
114 changes: 71 additions & 43 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ static NodeConstruction prepareConstruction(
List<Closeable> closeables = new ArrayList<>();
try {
NodeConstruction constructor = new NodeConstruction(closeables);
constructor.construct(initialEnvironment, serviceProvider, forbidPrivateIndexSettings);
Settings settings = constructor.createEnvironment(initialEnvironment, serviceProvider);
ThreadPool threadPool = constructor.createThreadPool(settings);
SettingsModule settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);

constructor.construct(threadPool, settingsModule, serviceProvider, forbidPrivateIndexSettings);

return constructor;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(closeables);
Expand Down Expand Up @@ -345,13 +350,12 @@ private <T> Optional<T> getSinglePlugin(Stream<T> plugins, Class<T> pluginClass)
return Optional.of(plugin);
}

private void construct(Environment initialEnvironment, NodeServiceProvider serviceProvider, boolean forbidPrivateIndexSettings)
throws IOException {
private Settings createEnvironment(Environment initialEnvironment, NodeServiceProvider serviceProvider) {
// Pass the node settings to the DeprecationLogger class so that it can have the deprecation.skip_deprecated_settings setting:
DeprecationLogger.initialize(initialEnvironment.settings());
Settings environmentSettings = initialEnvironment.settings();
Settings envSettings = initialEnvironment.settings();
DeprecationLogger.initialize(envSettings);

final JvmInfo jvmInfo = JvmInfo.jvmInfo();
JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(
"version[{}], pid[{}], build[{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Build.current().qualifiedVersion(),
Expand All @@ -375,7 +379,7 @@ private void construct(Environment initialEnvironment, NodeServiceProvider servi
Build.current().qualifiedVersion()
);
}
if (Environment.PATH_SHARED_DATA_SETTING.exists(environmentSettings)) {
if (Environment.PATH_SHARED_DATA_SETTING.exists(envSettings)) {
// NOTE: this must be done with an explicit check here because the deprecation property on a path setting will
// cause ES to fail to start since logging is not yet initialized on first read of the setting
deprecationLogger.warn(
Expand All @@ -394,7 +398,7 @@ private void construct(Environment initialEnvironment, NodeServiceProvider servi
+ "multiple disks. This feature will be removed in a future release."
);
}
if (Environment.dataPathUsesList(environmentSettings)) {
if (Environment.dataPathUsesList(envSettings)) {
// already checked for multiple values above, so if this is a list it is a single valued list
deprecationLogger.warn(
DeprecationCategory.SETTINGS,
Expand All @@ -418,8 +422,8 @@ private void construct(Environment initialEnvironment, NodeServiceProvider servi
(e, apmConfig) -> logger.error("failed to delete temporary APM config file [{}], reason: [{}]", apmConfig, e.getMessage())
);

pluginsService = serviceProvider.newPluginService(initialEnvironment, environmentSettings);
final Settings settings = Node.mergePluginSettings(pluginsService.pluginMap(), environmentSettings);
pluginsService = serviceProvider.newPluginService(initialEnvironment, envSettings);
Settings settings = Node.mergePluginSettings(pluginsService.pluginMap(), envSettings);

/*
* Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
Expand All @@ -428,16 +432,69 @@ private void construct(Environment initialEnvironment, NodeServiceProvider servi
environment = new Environment(settings, initialEnvironment.configFile());
Environment.assertEquivalent(initialEnvironment, environment);

final List<ExecutorBuilder<?>> executorBuilders = pluginsService.flatMap(p -> p.getExecutorBuilders(settings)).toList();
return settings;
}

final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder<?>[0]));
private ThreadPool createThreadPool(Settings settings) throws IOException {
ThreadPool threadPool = new ThreadPool(
settings,
pluginsService.flatMap(p -> p.getExecutorBuilders(settings)).toArray(ExecutorBuilder<?>[]::new)
);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);

// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
HeaderWarning.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));

return threadPool;
}

private SettingsModule validateSettings(Settings envSettings, Settings settings, ThreadPool threadPool) throws IOException {
// register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.flatMap(Plugin::getSettings).toList());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsExtension.load().forEach(e -> additionalSettings.addAll(e.getSettings()));

// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
SettingsModule settingsModule = new SettingsModule(
settings,
additionalSettings,
pluginsService.flatMap(Plugin::getSettingsFilter).toList()
);

// creating `NodeEnvironment` breaks the ability to rollback to 7.x on an 8.0 upgrade (`upgradeLegacyNodeFolders`) so do this
// after settings validation.
nodeEnvironment = new NodeEnvironment(envSettings, environment);
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
Node.NODE_NAME_SETTING.get(envSettings),
nodeEnvironment.nodeId(),
ClusterName.CLUSTER_NAME_SETTING.get(envSettings).value(),
DiscoveryNode.getRolesFromSettings(settings)
.stream()
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toCollection(LinkedHashSet::new))
);
resourcesToClose.add(nodeEnvironment);

return settingsModule;
}

private void construct(
ThreadPool threadPool,
SettingsModule settingsModule,
NodeServiceProvider serviceProvider,
boolean forbidPrivateIndexSettings
) throws IOException {

Settings settings = settingsModule.getSettings();

final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);

final Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()),
Task.HEADERS_TO_COPY.stream()
Expand All @@ -450,12 +507,6 @@ private void construct(Environment initialEnvironment, NodeServiceProvider servi

final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders, tracer);

// register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.flatMap(Plugin::getSettings).toList());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsExtension.load().forEach(e -> additionalSettings.addAll(e.getSettings()));
client = new NodeClient(settings, threadPool);

final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class).toList());
Expand All @@ -471,29 +522,6 @@ private void construct(Environment initialEnvironment, NodeServiceProvider servi
pluginsService.filterPlugins(AnalysisPlugin.class).toList(),
pluginsService.getStablePluginRegistry()
);
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already

final SettingsModule settingsModule = new SettingsModule(
settings,
additionalSettings,
pluginsService.flatMap(Plugin::getSettingsFilter).toList()
);

// creating `NodeEnvironment` breaks the ability to rollback to 7.x on an 8.0 upgrade (`upgradeLegacyNodeFolders`) so do this
// after settings validation.
nodeEnvironment = new NodeEnvironment(environmentSettings, environment);
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
Node.NODE_NAME_SETTING.get(environmentSettings),
nodeEnvironment.nodeId(),
ClusterName.CLUSTER_NAME_SETTING.get(environmentSettings).value(),
DiscoveryNode.getRolesFromSettings(settings)
.stream()
.map(DiscoveryNodeRole::roleName)
.collect(Collectors.toCollection(LinkedHashSet::new))
);
resourcesToClose.add(nodeEnvironment);
localNodeFactory = new Node.LocalNodeFactory(settings, nodeEnvironment.nodeId());

ScriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
Expand Down

0 comments on commit 4fc58af

Please sign in to comment.