From 8822d953e1acf9cbdbf359b5a092f228dc590e69 Mon Sep 17 00:00:00 2001 From: Frank Lou <107960841+mloufra@users.noreply.github.com> Date: Tue, 4 Oct 2022 11:40:46 -0700 Subject: [PATCH] Refactor Netty4 method (#166) * issue #28 Signed-off-by: mloufra * Update the lastest coomit Signed-off-by: mloufra * Rename the method and fix the conflict Signed-off-by: mloufra * fix merge conflict Signed-off-by: mloufra * Add code coverage report Signed-off-by: mloufra * Rebase the lastest commit Signed-off-by: mloufra * update the lastest commit Signed-off-by: mloufra * refactor netty4 and initialozaExtensionTransport method Signed-off-by: mloufra * resolve comment Signed-off-by: mloufra Signed-off-by: mloufra --- .../org/opensearch/sdk/ExtensionsRunner.java | 105 +-------------- .../org/opensearch/sdk/NettyTransport.java | 120 ++++++++++++++++++ .../opensearch/sdk/TestNetty4Transport.java | 11 +- .../sdk/TransportCommunicationIT.java | 6 +- 4 files changed, 131 insertions(+), 111 deletions(-) create mode 100644 src/main/java/org/opensearch/sdk/NettyTransport.java diff --git a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java index 062deef2..0627ccfd 100644 --- a/src/main/java/org/opensearch/sdk/ExtensionsRunner.java +++ b/src/main/java/org/opensearch/sdk/ExtensionsRunner.java @@ -12,20 +12,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.Version; -import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest; import org.opensearch.extensions.OpenSearchRequest; import org.opensearch.extensions.rest.ExtensionRestRequest; import org.opensearch.extensions.rest.RegisterRestActionsRequest; import org.opensearch.extensions.settings.RegisterCustomSettingsRequest; -import org.opensearch.common.network.NetworkModule; -import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.discovery.InitializeExtensionsRequest; import org.opensearch.extensions.ExtensionActionListenerOnFailureRequest; import org.opensearch.extensions.DiscoveryExtension; @@ -35,12 +29,7 @@ import org.opensearch.extensions.ExtensionRequest; import org.opensearch.extensions.ExtensionsOrchestrator; import org.opensearch.index.IndicesModuleRequest; -import org.opensearch.indices.IndicesModule; -import org.opensearch.indices.breaker.CircuitBreakerService; -import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.rest.RestHandler.Route; -import org.opensearch.transport.netty4.Netty4Transport; -import org.opensearch.transport.SharedGroupFactory; import org.opensearch.sdk.handlers.ActionListenerOnFailureResponseHandler; import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler; import org.opensearch.sdk.handlers.ClusterStateResponseHandler; @@ -54,11 +43,7 @@ import org.opensearch.sdk.handlers.UpdateSettingsRequestHandler; import org.opensearch.sdk.handlers.ExtensionStringResponseHandler; import org.opensearch.sdk.handlers.OpensearchRequestHandler; -import org.opensearch.search.SearchModule; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.ClusterConnectionManager; -import org.opensearch.transport.ConnectionManager; -import org.opensearch.transport.TransportInterceptor; import org.opensearch.transport.TransportService; import org.opensearch.transport.TransportSettings; @@ -69,14 +54,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.function.Consumer; -import static java.util.Collections.emptySet; -import static org.opensearch.common.UUIDs.randomBase64UUID; - /** * The primary class to run an extension. *

@@ -109,8 +88,6 @@ public class ExtensionsRunner { * This field is initialized by a call from {@link ExtensionsInitRequestHandler}. */ public final Settings settings; - private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() { - }; private ExtensionNamedWriteableRegistry namedWriteableRegistryApi = new ExtensionNamedWriteableRegistry(); private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(); private OpensearchRequestHandler opensearchRequestHandler = new OpensearchRequestHandler(); @@ -118,6 +95,7 @@ public class ExtensionsRunner { private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler = new ExtensionsIndicesModuleNameRequestHandler(); private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(); + private NettyTransport nettyTransport = new NettyTransport(); /* * TODO: expose an interface for extension to register actions @@ -170,7 +148,7 @@ private ExtensionsRunner(Extension extension) throws IOException { // save custom settings this.customSettings = extension.getSettings(); // initialize the transport service - this.initializeExtensionTransportService(this.getSettings()); + nettyTransport.initializeExtensionTransportService(this.getSettings(), this); // start listening on configured port and wait for connection from OpenSearch this.startActionListener(0); } @@ -213,83 +191,6 @@ DiscoveryNode getOpensearchNode() { return opensearchNode; } - /** - * Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object. - * - * @param settings The transport settings to configure. - * @param threadPool A thread pool to use. - * @return The configured Netty4Transport object. - */ - public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) { - NetworkService networkService = new NetworkService(Collections.emptyList()); - PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); - IndicesModule indicesModule = new IndicesModule(Collections.emptyList()); - SearchModule searchModule = new SearchModule(settings, Collections.emptyList()); - - List namedWriteables = Stream.of( - NetworkModule.getNamedWriteables().stream(), - indicesModule.getNamedWriteables().stream(), - searchModule.getNamedWriteables().stream(), - null, - ClusterModule.getNamedWriteables().stream() - ).flatMap(Function.identity()).collect(Collectors.toList()); - - final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); - - final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); - - Netty4Transport transport = new Netty4Transport( - settings, - Version.CURRENT, - threadPool, - networkService, - pageCacheRecycler, - namedWriteableRegistry, - circuitBreakerService, - new SharedGroupFactory(settings) - ); - - return transport; - } - - /** - * Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch. - * - * @param settings The transport settings to configure. - * @return The initialized TransportService object. - */ - public TransportService initializeExtensionTransportService(Settings settings) { - - ThreadPool threadPool = new ThreadPool(settings); - - Netty4Transport transport = getNetty4Transport(settings, threadPool); - - final ConnectionManager connectionManager = new ClusterConnectionManager(settings, transport); - - // Stop any existing transport service - if (extensionTransportService != null) { - extensionTransportService.stop(); - } - - // create transport service - extensionTransportService = new TransportService( - settings, - transport, - threadPool, - NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal( - Settings.builder().put(NODE_NAME_SETTING, settings.get(NODE_NAME_SETTING)).build(), - boundAddress.publishAddress(), - randomBase64UUID() - ), - null, - emptySet(), - connectionManager - ); - startTransportService(extensionTransportService); - return extensionTransportService; - } - /** * Starts a TransportService. * @@ -591,7 +492,7 @@ public static void main(String[] args) throws IOException { ExtensionsRunner extensionsRunner = new ExtensionsRunner(); // initialize the transport service - extensionsRunner.initializeExtensionTransportService(extensionsRunner.getSettings()); + extensionsRunner.nettyTransport.initializeExtensionTransportService(extensionsRunner.getSettings(), extensionsRunner); // start listening on configured port and wait for connection from OpenSearch extensionsRunner.startActionListener(0); } diff --git a/src/main/java/org/opensearch/sdk/NettyTransport.java b/src/main/java/org/opensearch/sdk/NettyTransport.java new file mode 100644 index 00000000..634dc5e7 --- /dev/null +++ b/src/main/java/org/opensearch/sdk/NettyTransport.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sdk; + +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.indices.IndicesModule; +import org.opensearch.indices.breaker.CircuitBreakerService; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.search.SearchModule; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.SharedGroupFactory; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.netty4.Netty4Transport; + +import static java.util.Collections.emptySet; +import static org.opensearch.common.UUIDs.randomBase64UUID; + +/** + * This class initializes a Netty4Transport object and control communication between the extension and OpenSearch. + */ + +public class NettyTransport { + private static final String NODE_NAME_SETTING = "node.name"; + private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() { + }; + + /** + * Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object. + * + * @param settings The transport settings to configure. + * @param threadPool A thread pool to use. + * @return The configured Netty4Transport object. + */ + public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) { + NetworkService networkService = new NetworkService(Collections.emptyList()); + PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); + IndicesModule indicesModule = new IndicesModule(Collections.emptyList()); + SearchModule searchModule = new SearchModule(settings, Collections.emptyList()); + + List namedWriteables = Stream.of( + NetworkModule.getNamedWriteables().stream(), + indicesModule.getNamedWriteables().stream(), + searchModule.getNamedWriteables().stream(), + ClusterModule.getNamedWriteables().stream() + ).flatMap(Function.identity()).collect(Collectors.toList()); + + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); + + final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); + + Netty4Transport transport = new Netty4Transport( + settings, + Version.CURRENT, + threadPool, + networkService, + pageCacheRecycler, + namedWriteableRegistry, + circuitBreakerService, + new SharedGroupFactory(settings) + ); + + return transport; + } + + /** + * Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch. + * + * @param settings The transport settings to configure. + * @param extensionsRunner method to call + * @return The initialized TransportService object. + */ + public TransportService initializeExtensionTransportService(Settings settings, ExtensionsRunner extensionsRunner) { + + ThreadPool threadPool = new ThreadPool(settings); + + Netty4Transport transport = getNetty4Transport(settings, threadPool); + + // Stop any existing transport service + if (extensionsRunner.extensionTransportService != null) { + extensionsRunner.extensionTransportService.stop(); + } + + // create transport service + extensionsRunner.extensionTransportService = new TransportService( + settings, + transport, + threadPool, + NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal( + Settings.builder().put(NODE_NAME_SETTING, settings.get(NODE_NAME_SETTING)).build(), + boundAddress.publishAddress(), + randomBase64UUID() + ), + null, + emptySet() + ); + extensionsRunner.startTransportService(extensionsRunner.extensionTransportService); + return extensionsRunner.extensionTransportService; + } + +} diff --git a/src/test/java/org/opensearch/sdk/TestNetty4Transport.java b/src/test/java/org/opensearch/sdk/TestNetty4Transport.java index c3ef71ae..7782348b 100644 --- a/src/test/java/org/opensearch/sdk/TestNetty4Transport.java +++ b/src/test/java/org/opensearch/sdk/TestNetty4Transport.java @@ -24,12 +24,11 @@ public class TestNetty4Transport extends OpenSearchTestCase { - private ExtensionsRunner extensionsRunner; private ThreadPool threadPool; + private NettyTransport nettyTransport = new NettyTransport(); @BeforeEach public void setUp() throws IOException { - this.extensionsRunner = new ExtensionsRunner(); this.threadPool = new TestThreadPool("test"); } @@ -44,7 +43,7 @@ public void testNettyCanBindToMultiplePorts() throws IOException { .put("transport.profiles.client1.port", 0) .build(); - Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool); + Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool); try { startNetty4Transport(transport); @@ -67,7 +66,7 @@ public void testDefaultProfileInheritsFomStandardSettings() throws IOException { .put("transport.profiles.client1.port", 0) .build(); - Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool); + Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool); try { startNetty4Transport(transport); @@ -94,7 +93,7 @@ public void testThatProfileWithoutPortFails() throws IOException { // attempt creating netty object with invalid settings IllegalStateException ex = expectThrows( IllegalStateException.class, - () -> extensionsRunner.getNetty4Transport(settings, threadPool) + () -> nettyTransport.getNetty4Transport(settings, threadPool) ); assertEquals("profile [no_port] has no port configured", ex.getMessage()); } finally { @@ -112,7 +111,7 @@ public void testDefaultProfilePortOverridesGeneralConfiguration() throws IOExcep .put("transport.profiles.default.port", 0) // default port configuration will overwrite attempt .build(); - Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool); + Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool); try { startNetty4Transport(transport); diff --git a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java index 787eaae9..8cbfe284 100644 --- a/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java +++ b/src/test/java/org/opensearch/sdk/TransportCommunicationIT.java @@ -35,6 +35,7 @@ public class TransportCommunicationIT extends OpenSearchIntegTestCase { private final int port = 7777; private final String host = "127.0.0.1"; private volatile String clientResult; + private NettyTransport nettyTransport = new NettyTransport(); @Override @BeforeEach @@ -51,9 +52,8 @@ public void setUp() { @Test public void testSocketSetup() throws IOException { - ExtensionsRunner extensionsRunner = new ExtensionsRunner(); ThreadPool threadPool = new TestThreadPool("test"); - Netty4Transport transport = extensionsRunner.getNetty4Transport(settings, threadPool); + Netty4Transport transport = nettyTransport.getNetty4Transport(settings, threadPool); // start netty transport and ensure that address info is exposed try { @@ -147,7 +147,7 @@ private void startTransportandClient(Settings settings, Thread client) throws IO // retrieve transport service ExtensionsRunner extensionsRunner = new ExtensionsRunner(); // start transport service - TransportService transportService = extensionsRunner.initializeExtensionTransportService(settings); + TransportService transportService = nettyTransport.initializeExtensionTransportService(settings, extensionsRunner); assertEquals(Lifecycle.State.STARTED, transportService.lifecycleState());