forked from opensearch-project/opensearch-sdk-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor Netty4 method (opensearch-project#166)
* issue opensearch-project#28 Signed-off-by: mloufra <[email protected]> * Update the lastest coomit Signed-off-by: mloufra <[email protected]> * Rename the method and fix the conflict Signed-off-by: mloufra <[email protected]> * fix merge conflict Signed-off-by: mloufra <[email protected]> * Add code coverage report Signed-off-by: mloufra <[email protected]> * Rebase the lastest commit Signed-off-by: mloufra <[email protected]> * update the lastest commit Signed-off-by: mloufra <[email protected]> * refactor netty4 and initialozaExtensionTransport method Signed-off-by: mloufra <[email protected]> * resolve comment Signed-off-by: mloufra <[email protected]> Signed-off-by: mloufra <[email protected]>
- Loading branch information
Showing
4 changed files
with
131 additions
and
111 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.sdk; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import org.opensearch.Version; | ||
import org.opensearch.cluster.ClusterModule; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.common.io.stream.NamedWriteableRegistry; | ||
import org.opensearch.common.network.NetworkModule; | ||
import org.opensearch.common.network.NetworkService; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.PageCacheRecycler; | ||
import org.opensearch.indices.IndicesModule; | ||
import org.opensearch.indices.breaker.CircuitBreakerService; | ||
import org.opensearch.indices.breaker.NoneCircuitBreakerService; | ||
import org.opensearch.search.SearchModule; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.transport.SharedGroupFactory; | ||
import org.opensearch.transport.TransportInterceptor; | ||
import org.opensearch.transport.TransportService; | ||
import org.opensearch.transport.netty4.Netty4Transport; | ||
|
||
import static java.util.Collections.emptySet; | ||
import static org.opensearch.common.UUIDs.randomBase64UUID; | ||
|
||
/** | ||
* This class initializes a Netty4Transport object and control communication between the extension and OpenSearch. | ||
*/ | ||
|
||
public class NettyTransport { | ||
private static final String NODE_NAME_SETTING = "node.name"; | ||
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() { | ||
}; | ||
|
||
/** | ||
* Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object. | ||
* | ||
* @param settings The transport settings to configure. | ||
* @param threadPool A thread pool to use. | ||
* @return The configured Netty4Transport object. | ||
*/ | ||
public Netty4Transport getNetty4Transport(Settings settings, ThreadPool threadPool) { | ||
NetworkService networkService = new NetworkService(Collections.emptyList()); | ||
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); | ||
IndicesModule indicesModule = new IndicesModule(Collections.emptyList()); | ||
SearchModule searchModule = new SearchModule(settings, Collections.emptyList()); | ||
|
||
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of( | ||
NetworkModule.getNamedWriteables().stream(), | ||
indicesModule.getNamedWriteables().stream(), | ||
searchModule.getNamedWriteables().stream(), | ||
ClusterModule.getNamedWriteables().stream() | ||
).flatMap(Function.identity()).collect(Collectors.toList()); | ||
|
||
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); | ||
|
||
final CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); | ||
|
||
Netty4Transport transport = new Netty4Transport( | ||
settings, | ||
Version.CURRENT, | ||
threadPool, | ||
networkService, | ||
pageCacheRecycler, | ||
namedWriteableRegistry, | ||
circuitBreakerService, | ||
new SharedGroupFactory(settings) | ||
); | ||
|
||
return transport; | ||
} | ||
|
||
/** | ||
* Initializes the TransportService object for this extension. This object will control communication between the extension and OpenSearch. | ||
* | ||
* @param settings The transport settings to configure. | ||
* @param extensionsRunner method to call | ||
* @return The initialized TransportService object. | ||
*/ | ||
public TransportService initializeExtensionTransportService(Settings settings, ExtensionsRunner extensionsRunner) { | ||
|
||
ThreadPool threadPool = new ThreadPool(settings); | ||
|
||
Netty4Transport transport = getNetty4Transport(settings, threadPool); | ||
|
||
// Stop any existing transport service | ||
if (extensionsRunner.extensionTransportService != null) { | ||
extensionsRunner.extensionTransportService.stop(); | ||
} | ||
|
||
// create transport service | ||
extensionsRunner.extensionTransportService = new TransportService( | ||
settings, | ||
transport, | ||
threadPool, | ||
NOOP_TRANSPORT_INTERCEPTOR, | ||
boundAddress -> DiscoveryNode.createLocal( | ||
Settings.builder().put(NODE_NAME_SETTING, settings.get(NODE_NAME_SETTING)).build(), | ||
boundAddress.publishAddress(), | ||
randomBase64UUID() | ||
), | ||
null, | ||
emptySet() | ||
); | ||
extensionsRunner.startTransportService(extensionsRunner.extensionTransportService); | ||
return extensionsRunner.extensionTransportService; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters