From 3ac47a60f1d66673041e2e6daa77710ebf8344b3 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Tue, 17 Dec 2024 10:34:06 -0800 Subject: [PATCH] Refactor auxiliary transport port settings framework. The motivation of this change is to allow for multiple configured auxiliary transports. As such newAuxTransports() now returns a list of enabled transports and each AuxTransport implements it's own 'aux.transport.type' and 'aux.transport..ports' setting. Since Security.java initializes previous to Node.java during bootstrap socket binding permissions are granted based on 'aux.transport..ports' for each enabled 'aux.transport.type', falling back to a default if no ports are specified. Signed-off-by: Finn Carroll --- .../transport/grpc/GrpcModulePlugin.java | 33 ++++----- .../grpc/Netty4GrpcServerTransport.java | 62 +++++++++++++---- .../grpc/Netty4GrpcServerTransportTests.java | 3 +- .../auxiliary/AuxTransportSettings.java | 55 --------------- .../opensearch/auxiliary/package-info.java | 12 ---- .../org/opensearch/bootstrap/Security.java | 27 +++++--- .../common/network/NetworkModule.java | 61 +++++----------- .../common/settings/ClusterSettings.java | 8 +-- .../main/java/org/opensearch/node/Node.java | 7 +- .../org/opensearch/plugins/NetworkPlugin.java | 69 ++++++++++++++----- 10 files changed, 162 insertions(+), 175 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/auxiliary/AuxTransportSettings.java delete mode 100644 server/src/main/java/org/opensearch/auxiliary/package-info.java diff --git a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcModulePlugin.java b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcModulePlugin.java index d736e6e851983..639182c891866 100644 --- a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcModulePlugin.java +++ b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcModulePlugin.java @@ -7,13 +7,11 @@ */ package org.opensearch.transport.grpc; -import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; @@ -25,25 +23,21 @@ import java.util.Map; import java.util.function.Supplier; -import static org.opensearch.auxiliary.AuxTransportSettings.SETTING_AUX_BIND_HOST; -import static org.opensearch.auxiliary.AuxTransportSettings.SETTING_AUX_HOST; -import static org.opensearch.auxiliary.AuxTransportSettings.SETTING_AUX_PUBLISH_HOST; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_HOST; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_PORTS; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_HOST; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_PORT; +import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT; /** * Main class for the gRPC plugin */ public final class GrpcModulePlugin extends Plugin implements NetworkPlugin { - public static final String GRPC_TRANSPORT_NAME = "grpc-transport"; - - public static final Setting SETTING_GRPC_WORKER_COUNT = new Setting<>( - "grpc.netty.worker_count", - (s) -> Integer.toString(OpenSearchExecutors.allocatedProcessors(s)), - (s) -> Setting.parseInt(s, 1, "grpc.netty.worker_count"), - Setting.Property.NodeScope - ); @Override - public Map> getAuxTransports( + public Map> getAuxTransports( Settings settings, ThreadPool threadPool, CircuitBreakerService circuitBreakerService, @@ -55,13 +49,20 @@ public Map> getAuxTransports( throw new IllegalArgumentException("transport-grpc is experimental and feature flag must be enabled before use"); } return Collections.singletonMap( - GRPC_TRANSPORT_NAME, + GRPC_TRANSPORT_SETTING_KEY, () -> new Netty4GrpcServerTransport(settings, Collections.emptyList(), networkService) ); } @Override public List> getSettings() { - return List.of(SETTING_AUX_HOST, SETTING_AUX_PUBLISH_HOST, SETTING_AUX_BIND_HOST, SETTING_GRPC_WORKER_COUNT); + return List.of( + SETTING_GRPC_PORTS, + SETTING_GRPC_HOST, + SETTING_GRPC_PUBLISH_HOST, + SETTING_GRPC_BIND_HOST, + SETTING_GRPC_WORKER_COUNT, + SETTING_GRPC_PUBLISH_PORT + ); } } diff --git a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java index ce19949c37b12..f6927d80f906d 100644 --- a/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java +++ b/plugins/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java @@ -10,14 +10,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.PortsRange; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.Strings; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.plugins.NetworkPlugin; import org.opensearch.transport.BindTransportException; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import io.grpc.BindableService; import io.grpc.InsecureServerCredentials; @@ -41,16 +43,48 @@ import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.protobuf.services.ProtoReflectionService; -import static org.opensearch.auxiliary.AuxTransportSettings.SETTING_AUX_BIND_HOST; -import static org.opensearch.auxiliary.AuxTransportSettings.SETTING_AUX_PORT; -import static org.opensearch.auxiliary.AuxTransportSettings.SETTING_AUX_PUBLISH_HOST; -import static org.opensearch.auxiliary.AuxTransportSettings.SETTING_AUX_PUBLISH_PORT; +import static java.util.Collections.emptyList; import static org.opensearch.common.network.NetworkService.resolvePublishPort; +import static org.opensearch.common.settings.Setting.intSetting; +import static org.opensearch.common.settings.Setting.listSetting; import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; -import static org.opensearch.transport.grpc.GrpcModulePlugin.SETTING_GRPC_WORKER_COUNT; -public class Netty4GrpcServerTransport extends AbstractLifecycleComponent implements LifecycleComponent { +public class Netty4GrpcServerTransport extends NetworkPlugin.AuxTransport { private static final Logger logger = LogManager.getLogger(Netty4GrpcServerTransport.class); + public static final String GRPC_TRANSPORT_SETTING_KEY = "grpc-transport"; + public static final Setting SETTING_GRPC_PORTS = AUX_TRANSPORT_PORTS.getConcreteSettingForNamespace( + GRPC_TRANSPORT_SETTING_KEY + ); + + public static final Setting SETTING_GRPC_WORKER_COUNT = new Setting<>( + "grpc.netty.worker_count", + (s) -> Integer.toString(OpenSearchExecutors.allocatedProcessors(s)), + (s) -> Setting.parseInt(s, 1, "grpc.netty.worker_count"), + Setting.Property.NodeScope + ); + + public static final Setting SETTING_GRPC_PUBLISH_PORT = intSetting("grpc.publish_port", -1, -1, Setting.Property.NodeScope); + + public static final Setting> SETTING_GRPC_HOST = listSetting( + "grpc.host", + emptyList(), + Function.identity(), + Setting.Property.NodeScope + ); + + public static final Setting> SETTING_GRPC_PUBLISH_HOST = listSetting( + "grpc.publish_host", + SETTING_GRPC_HOST, + Function.identity(), + Setting.Property.NodeScope + ); + + public static final Setting> SETTING_GRPC_BIND_HOST = listSetting( + "grpc.bind_host", + SETTING_GRPC_HOST, + Function.identity(), + Setting.Property.NodeScope + ); private final Settings settings; private final NetworkService networkService; @@ -69,16 +103,16 @@ public Netty4GrpcServerTransport(Settings settings, List servic this.services = Objects.requireNonNull(services); this.networkService = Objects.requireNonNull(networkService); - final List httpBindHost = SETTING_AUX_BIND_HOST.get(settings); + final List httpBindHost = SETTING_GRPC_BIND_HOST.get(settings); this.bindHosts = (httpBindHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_BIND_HOST_SETTING.get(settings) : httpBindHost).toArray( Strings.EMPTY_ARRAY ); - final List httpPublishHost = SETTING_AUX_PUBLISH_HOST.get(settings); + final List httpPublishHost = SETTING_GRPC_PUBLISH_HOST.get(settings); this.publishHosts = (httpPublishHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_PUBLISH_HOST_SETTING.get(settings) : httpPublishHost) .toArray(Strings.EMPTY_ARRAY); - this.port = SETTING_AUX_PORT.get(settings); + this.port = SETTING_GRPC_PORTS.get(settings); this.nettyEventLoopThreads = SETTING_GRPC_WORKER_COUNT.get(settings); } @@ -151,7 +185,7 @@ private void bindServer() { throw new BindTransportException("Failed to resolve publish address", e); } - final int publishPort = resolvePublishPort(SETTING_AUX_PUBLISH_PORT.get(settings), boundAddresses, publishInetAddress); + final int publishPort = resolvePublishPort(SETTING_GRPC_PUBLISH_PORT.get(settings), boundAddresses, publishInetAddress); if (publishPort < 0) { throw new BindTransportException( "Failed to auto-resolve grpc publish port, multiple bound addresses " @@ -160,9 +194,9 @@ private void bindServer() { + publishInetAddress + "). " + "Please specify a unique port by setting " - + SETTING_AUX_PORT.getKey() + + SETTING_GRPC_PORTS.getKey() + " or " - + SETTING_AUX_PUBLISH_PORT.getKey() + + SETTING_GRPC_PUBLISH_PORT.getKey() ); } diff --git a/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java index f29f597f90f10..ebeff62c2c23c 100644 --- a/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java +++ b/plugins/transport-grpc/src/test/java/org/opensearch/transport/grpc/Netty4GrpcServerTransportTests.java @@ -8,7 +8,6 @@ package org.opensearch.transport.grpc; -import org.opensearch.auxiliary.AuxTransportSettings; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; @@ -45,6 +44,6 @@ public void test() { } private static Settings createSettings() { - return Settings.builder().put(AuxTransportSettings.SETTING_AUX_PORT.getKey(), getPortRange()).build(); + return Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORTS.getKey(), getPortRange()).build(); } } diff --git a/server/src/main/java/org/opensearch/auxiliary/AuxTransportSettings.java b/server/src/main/java/org/opensearch/auxiliary/AuxTransportSettings.java deleted file mode 100644 index 5373b2d3dfe73..0000000000000 --- a/server/src/main/java/org/opensearch/auxiliary/AuxTransportSettings.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.auxiliary; - -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Setting.Property; -import org.opensearch.common.transport.PortsRange; - -import java.util.List; -import java.util.function.Function; - -import static java.util.Collections.emptyList; -import static org.opensearch.common.settings.Setting.intSetting; -import static org.opensearch.common.settings.Setting.listSetting; - -/** - * Auxiliary transport server settings. - * - * @opensearch.internal - */ -public final class AuxTransportSettings { - - public static final Setting SETTING_AUX_PORT = new Setting<>("aux.port", "9400-9500", PortsRange::new, Property.NodeScope); - - public static final Setting SETTING_AUX_PUBLISH_PORT = intSetting("aux.publish_port", -1, -1, Setting.Property.NodeScope); - - public static final Setting> SETTING_AUX_HOST = listSetting( - "aux.host", - emptyList(), - Function.identity(), - Setting.Property.NodeScope - ); - - public static final Setting> SETTING_AUX_PUBLISH_HOST = listSetting( - "aux.publish_host", - SETTING_AUX_HOST, - Function.identity(), - Setting.Property.NodeScope - ); - - public static final Setting> SETTING_AUX_BIND_HOST = listSetting( - "aux.bind_host", - SETTING_AUX_HOST, - Function.identity(), - Setting.Property.NodeScope - ); - - private AuxTransportSettings() {} -} diff --git a/server/src/main/java/org/opensearch/auxiliary/package-info.java b/server/src/main/java/org/opensearch/auxiliary/package-info.java deleted file mode 100644 index 453e44e94b7f4..0000000000000 --- a/server/src/main/java/org/opensearch/auxiliary/package-info.java +++ /dev/null @@ -1,12 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * Settings and configuration helpers for optional auxiliary client/server transports provided by installed plugins. - */ -package org.opensearch.auxiliary; diff --git a/server/src/main/java/org/opensearch/bootstrap/Security.java b/server/src/main/java/org/opensearch/bootstrap/Security.java index 294899094b747..9f1dcbe8fb587 100644 --- a/server/src/main/java/org/opensearch/bootstrap/Security.java +++ b/server/src/main/java/org/opensearch/bootstrap/Security.java @@ -32,11 +32,12 @@ package org.opensearch.bootstrap; -import org.opensearch.auxiliary.AuxTransportSettings; import org.opensearch.cli.Command; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.io.PathUtils; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.PortsRange; import org.opensearch.env.Environment; import org.opensearch.http.HttpTransportSettings; import org.opensearch.plugins.PluginInfo; @@ -72,7 +73,9 @@ import static org.opensearch.bootstrap.FilePermissionUtils.addDirectoryPath; import static org.opensearch.bootstrap.FilePermissionUtils.addSingleFilePath; -import static org.opensearch.common.network.NetworkModule.AUX_TRANSPORT_TYPE_SETTING; +import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_PORT_DEFAULTS; +import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_PORTS; +import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_SETTING; /** * Initializes SecurityManager with necessary permissions. @@ -420,18 +423,26 @@ private static void addSocketPermissionForHttp(final Permissions policy, final S } /** - * Add dynamic {@link SocketPermission} based on auxiliary transport settings {@link AuxTransportSettings}. - * Socket permissions are not provided if no auxiliary transport is selected. + * Add dynamic {@link SocketPermission} based on AffixSetting AUX_TRANSPORT_PORTS. + * If an auxiliary transport type is enabled but has no corresponding port range setting fall back to AUX_PORT_DEFAULTS. * * @param policy the {@link Permissions} instance to apply the dynamic {@link SocketPermission}s to. * @param settings the {@link Settings} instance to read the gRPC settings from */ private static void addSocketPermissionForAux(final Permissions policy, final Settings settings) { - if (!AUX_TRANSPORT_TYPE_SETTING.exists(settings)) { - return; + Set portsRanges = new HashSet<>(); + for (String auxType : AUX_TRANSPORT_TYPES_SETTING.get(settings)) { + Setting auxTypePortSettings = AUX_TRANSPORT_PORTS.getConcreteSettingForNamespace(auxType); + if (auxTypePortSettings.exists(settings)) { + portsRanges.add(auxTypePortSettings.get(settings)); + } else { + portsRanges.add(new PortsRange(AUX_PORT_DEFAULTS)); + } + } + + for (PortsRange portRange : portsRanges) { + addSocketPermissionForPortRange(policy, portRange.getPortRangeString()); } - final String auxRange = AuxTransportSettings.SETTING_AUX_PORT.get(settings).getPortRangeString(); - addSocketPermissionForPortRange(policy, auxRange); } /** diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 0c2a19e768428..f63c5b508b357 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -40,9 +40,6 @@ import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.CheckedFunction; -import org.opensearch.common.lifecycle.Lifecycle; -import org.opensearch.common.lifecycle.LifecycleComponent; -import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -83,6 +80,9 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_KEY; +import static org.opensearch.plugins.NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_SETTING; + /** * A module to handle registering and binding all network related classes. * @@ -92,7 +92,6 @@ public final class NetworkModule { public static final String TRANSPORT_TYPE_KEY = "transport.type"; public static final String HTTP_TYPE_KEY = "http.type"; - public static final String AUX_TRANSPORT_TYPE_KEY = "aux_transport.type"; public static final String HTTP_TYPE_DEFAULT_KEY = "http.type.default"; public static final String TRANSPORT_TYPE_DEFAULT_KEY = "transport.type.default"; public static final String TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION_KEY = "transport.ssl.enforce_hostname_verification"; @@ -105,7 +104,6 @@ public final class NetworkModule { ); public static final Setting HTTP_DEFAULT_TYPE_SETTING = Setting.simpleString(HTTP_TYPE_DEFAULT_KEY, Property.NodeScope); public static final Setting HTTP_TYPE_SETTING = Setting.simpleString(HTTP_TYPE_KEY, Property.NodeScope); - public static final Setting AUX_TRANSPORT_TYPE_SETTING = Setting.simpleString(AUX_TRANSPORT_TYPE_KEY, Property.NodeScope); public static final Setting TRANSPORT_TYPE_SETTING = Setting.simpleString(TRANSPORT_TYPE_KEY, Property.NodeScope); public static final Setting TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION = Setting.boolSetting( @@ -162,7 +160,7 @@ public final class NetworkModule { private final Map> transportFactories = new HashMap<>(); private final Map> transportHttpFactories = new HashMap<>(); - private final Map> transportAuxFactories = new HashMap<>(); + private final Map> transportAuxFactories = new HashMap<>(); private final List transportInterceptors = new ArrayList<>(); @@ -229,7 +227,7 @@ public NetworkModule( registerHttpTransport(entry.getKey(), entry.getValue()); } - Map> auxTransportFactory = plugin.getAuxTransports( + Map> auxTransportFactory = plugin.getAuxTransports( settings, threadPool, circuitBreakerService, @@ -237,7 +235,7 @@ public NetworkModule( clusterSettings, tracer ); - for (Map.Entry> entry : auxTransportFactory.entrySet()) { + for (Map.Entry> entry : auxTransportFactory.entrySet()) { registerAuxTransport(entry.getKey(), entry.getValue()); } @@ -324,7 +322,7 @@ private void registerHttpTransport(String key, Supplier fac } } - private void registerAuxTransport(String key, Supplier factory) { + private void registerAuxTransport(String key, Supplier factory) { if (transportAuxFactories.putIfAbsent(key, factory) != null) { throw new IllegalArgumentException("transport for name: " + key + " is already registered"); } @@ -372,43 +370,22 @@ public Supplier getHttpServerTransportSupplier() { } /** - * Auxiliary transports are optional and may not be enabled. - * If AUX_TRANSPORT_TYPE_SETTING is not set return a no-op supplier. - * If AUX_TRANSPORT_TYPE_SETTING is set but no factory is registered for the given type an exception is thrown. + * Optional client/server transports that run in parallel to HttpServerTransport. + * Multiple transport types can be registered and enabled via AUX_TRANSPORT_TYPES_SETTING. + * An IllegalStateException is thrown if a transport type is enabled not registered. */ - public Supplier getAuxServerTransportSupplier() { - if (!AUX_TRANSPORT_TYPE_SETTING.exists(settings)) { - // Supplier for no-op server transport. - return () -> new LifecycleComponent() { - @Override - public Lifecycle.State lifecycleState() { - return null; - } - - @Override - public void addLifecycleListener(LifecycleListener listener) {} - - @Override - public void removeLifecycleListener(LifecycleListener listener) {} - - @Override - public void start() {} - - @Override - public void stop() {} + public List getAuxServerTransportSupplierList() { + List serverTransportSuppliers = new ArrayList<>(); - @Override - public void close() {} - }; - } - - final String name = AUX_TRANSPORT_TYPE_SETTING.get(settings); - final Supplier factory = transportAuxFactories.get(name); - if (factory == null) { - throw new IllegalStateException("Unsupported " + AUX_TRANSPORT_TYPE_KEY + " [" + name + "]"); + for (String transportType : AUX_TRANSPORT_TYPES_SETTING.get(settings)) { + final Supplier factory = transportAuxFactories.get(transportType); + if (factory == null) { + throw new IllegalStateException("Unsupported " + AUX_TRANSPORT_TYPES_KEY + " [" + transportType + "]"); + } + serverTransportSuppliers.add(factory.get()); } - return factory; + return serverTransportSuppliers; } public Supplier getTransportSupplier() { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 625207c5c7917..1b00754e35192 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -41,7 +41,6 @@ import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; import org.opensearch.action.support.replication.TransportReplicationAction; -import org.opensearch.auxiliary.AuxTransportSettings; import org.opensearch.bootstrap.BootstrapSettings; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterModule; @@ -150,6 +149,7 @@ import org.opensearch.node.resource.tracker.ResourceTrackerSettings; import org.opensearch.persistent.PersistentTasksClusterService; import org.opensearch.persistent.decider.EnableAssignmentDecider; +import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.PluginsService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; @@ -360,10 +360,10 @@ public void apply(Settings value, Settings current, Settings previous) { NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, NetworkModule.HTTP_TYPE_SETTING, NetworkModule.TRANSPORT_TYPE_SETTING, - NetworkModule.AUX_TRANSPORT_TYPE_SETTING, NetworkModule.TRANSPORT_SSL_DUAL_MODE_ENABLED, NetworkModule.TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION, NetworkModule.TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION_RESOLVE_HOST_NAME, + NetworkPlugin.AuxTransport.AUX_TRANSPORT_TYPES_SETTING, HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS, HttpTransportSettings.SETTING_CORS_ENABLED, HttpTransportSettings.SETTING_CORS_MAX_AGE, @@ -845,8 +845,6 @@ public void apply(Settings value, Settings current, Settings previous) { ) ), List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL), - List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING), - List.of(FeatureFlags.GRPC_EXPERIMENTAL), - List.of(AuxTransportSettings.SETTING_AUX_PORT) + List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING) ); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 605594d1eb9b5..88895e15f5d12 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1216,7 +1216,8 @@ protected Node( SearchExecutionStatsCollector.makeWrapper(responseCollectorService) ); final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); - pluginComponents.add(newAuxTransport(networkModule)); + + pluginComponents.addAll(newAuxTransports(networkModule)); final IndexingPressureService indexingPressureService = new IndexingPressureService(settings, clusterService); // Going forward, IndexingPressureService will have required constructs for exposing listeners/interfaces for plugin @@ -2115,8 +2116,8 @@ protected HttpServerTransport newHttpTransport(NetworkModule networkModule) { return networkModule.getHttpServerTransportSupplier().get(); } - protected LifecycleComponent newAuxTransport(NetworkModule networkModule) { - return networkModule.getAuxServerTransportSupplier().get(); + protected List newAuxTransports(NetworkModule networkModule) { + return networkModule.getAuxServerTransportSupplierList(); } private static class LocalNodeFactory implements Function { diff --git a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java index 33af7eeaeb6b7..e1a0ad8f71025 100644 --- a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java @@ -32,10 +32,12 @@ package org.opensearch.plugins; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.common.lifecycle.LifecycleComponent; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.PortsRange; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.util.concurrent.ThreadContext; @@ -51,14 +53,62 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.function.Supplier; +import static java.util.Collections.emptyList; +import static org.opensearch.common.settings.Setting.affixKeySetting; + /** * Plugin for extending network and transport related classes * * @opensearch.api */ public interface NetworkPlugin { + + /** + * Auxiliary transports are lifecycle components with an associated port range. + * These pluggable client/server transport implementations have their lifecycle managed by Node. + * + * Auxiliary transports are additionally defined by a port range on which they bind. Opening permissions on these + * ports is awkward as {@link org.opensearch.bootstrap.Security} is configured previous to Node initialization during + * bootstrap. To allow pluggable AuxTransports access to configurable port ranges we require the port range be provided + * through an {@link org.opensearch.common.settings.Setting.AffixSetting} of the form 'AUX_SETTINGS_PREFIX.{aux-transport-key}.ports'. + */ + abstract class AuxTransport extends AbstractLifecycleComponent { + public static final String AUX_SETTINGS_PREFIX = "aux.transport."; + public static final String AUX_TRANSPORT_TYPES_KEY = AUX_SETTINGS_PREFIX + "type"; + public static final String AUX_PORT_DEFAULTS = "9400-9500"; + public static final Setting.AffixSetting AUX_TRANSPORT_PORTS = affixKeySetting( + AUX_SETTINGS_PREFIX, + "ports", + key -> new Setting<>(key, AUX_PORT_DEFAULTS, PortsRange::new, Setting.Property.NodeScope) + ); + + public static final Setting> AUX_TRANSPORT_TYPES_SETTING = Setting.listSetting( + AUX_TRANSPORT_TYPES_KEY, + emptyList(), + Function.identity(), + Setting.Property.NodeScope + ); + } + + /** + * Auxiliary transports are optional and run in parallel to the default HttpServerTransport. + * Returns a map of AuxTransport suppliers. + */ + @ExperimentalApi + default Map> getAuxTransports( + Settings settings, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + NetworkService networkService, + ClusterSettings clusterSettings, + Tracer tracer + ) { + return Collections.emptyMap(); + } + /** * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing * transport (inter-node) requests. This must not return null @@ -109,23 +159,6 @@ default Map> getHttpTransports( return Collections.emptyMap(); } - /** - * Auxiliary transports are optional and run in parallel to the default HttpServerTransport. - * Returns a map of {@link LifecycleComponent} suppliers. - * See {@link org.opensearch.common.network.NetworkModule#AUX_TRANSPORT_TYPE_SETTING} to configure a specific implementation. - */ - @ExperimentalApi - default Map> getAuxTransports( - Settings settings, - ThreadPool threadPool, - CircuitBreakerService circuitBreakerService, - NetworkService networkService, - ClusterSettings clusterSettings, - Tracer tracer - ) { - return Collections.emptyMap(); - } - /** * Returns a map of secure {@link Transport} suppliers. * See {@link org.opensearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.