From b752555d38477b7077aacb4b8a15b07877299b0c Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Fri, 28 Apr 2023 14:12:21 -0400 Subject: [PATCH] Add connectToNodeAsExtension in TransportService (#6866) * Add connectToNodeAsExtension in TransportService Signed-off-by: Craig Perkins * Add to CHANGELOG Signed-off-by: Craig Perkins * Update java docstrings Signed-off-by: Craig Perkins --------- Signed-off-by: Craig Perkins --- CHANGELOG.md | 1 + .../transport/TransportService.java | 70 ++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ce95535f16a2..bd43855a3ae1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636)) - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) +- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 804dfab928dd4..1e537c27e6b98 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -165,7 +165,7 @@ public void close() {} * Build the service. * * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings - * * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. + * * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. */ public TransportService( Settings settings, @@ -397,6 +397,15 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { connectToNode(node, (ConnectionProfile) null); } + /** + * Connect to the specified node as an extension with the default connection profile + * + * @param node the node to connect to + */ + public void connectToNodeAsExtension(DiscoveryNode node, String extensionUniqueId) throws ConnectTransportException { + connectToNodeAsExtension(node, (ConnectionProfile) null, extensionUniqueId); + } + // We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's public void connectToExtensionNode(final DiscoveryNode node) { PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null))); @@ -412,6 +421,19 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null))); } + /** + * Connect to the specified node with the given connection profile + * + * @param node the node to connect to + * @param connectionProfile the connection profile to use when connecting to this node + * @param extensionUniqueIq the id of the extension + */ + public void connectToNodeAsExtension(final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueIq) { + PlainActionFuture.get( + fut -> connectToNodeAsExtension(node, connectionProfile, extensionUniqueIq, ActionListener.map(fut, x -> null)) + ); + } + public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) { PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null))); } @@ -447,6 +469,33 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener); } + /** + * Connect to the specified node as an extension with the given connection profile. + * The ActionListener will be called on the calling thread or the generic thread pool. + * + * @param node the node to connect to + * @param connectionProfile the connection profile to use when connecting to this node + * @param extensionUniqueId the id of the extension + * @param listener the action listener to notify + */ + public void connectToNodeAsExtension( + final DiscoveryNode node, + ConnectionProfile connectionProfile, + String extensionUniqueId, + ActionListener listener + ) { + if (isLocalNode(node)) { + listener.onResponse(null); + return; + } + connectionManager.connectToNode( + node, + connectionProfile, + connectionValidatorForExtensionConnectingToNode(node, extensionUniqueId), + listener + ); + } + public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { if (isLocalNode(node)) { listener.onResponse(null); @@ -470,6 +519,25 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n }; } + public ConnectionManager.ConnectionValidator connectionValidatorForExtensionConnectingToNode( + DiscoveryNode node, + String extensionUniqueId + ) { + return (newConnection, actualProfile, listener) -> { + // We don't validate cluster names to allow for CCS connections. + threadPool.getThreadContext().putHeader("extension_unique_id", extensionUniqueId); + handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { + final DiscoveryNode remote = resp.discoveryNode; + + if (node.equals(remote) == false) { + throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); + } + + return null; + })); + }; + } + public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) { return (newConnection, actualProfile, listener) -> { // We don't validate cluster names to allow for CCS connections.