Skip to content

Commit

Permalink
Add connectToNodeAsExtension in TransportService (opensearch-project#…
Browse files Browse the repository at this point in the history
…6866)

* Add connectToNodeAsExtension in TransportService

Signed-off-by: Craig Perkins <[email protected]>

* Add to CHANGELOG

Signed-off-by: Craig Perkins <[email protected]>

* Update java docstrings

Signed-off-by: Craig Perkins <[email protected]>

---------

Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks authored Apr 28, 2023
1 parent 611e007 commit b752555
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636))
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void close() {}
* Build the service.
*
* @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
*/
public TransportService(
Settings settings,
Expand Down Expand Up @@ -397,6 +397,15 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, (ConnectionProfile) null);
}

/**
* Connect to the specified node as an extension with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNodeAsExtension(DiscoveryNode node, String extensionUniqueId) throws ConnectTransportException {
connectToNodeAsExtension(node, (ConnectionProfile) null, extensionUniqueId);
}

// We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's
public void connectToExtensionNode(final DiscoveryNode node) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null)));
Expand All @@ -412,6 +421,19 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}

/**
* Connect to the specified node with the given connection profile
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueIq the id of the extension
*/
public void connectToNodeAsExtension(final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueIq) {
PlainActionFuture.get(
fut -> connectToNodeAsExtension(node, connectionProfile, extensionUniqueIq, ActionListener.map(fut, x -> null))
);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}
Expand Down Expand Up @@ -447,6 +469,33 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener);
}

/**
* Connect to the specified node as an extension with the given connection profile.
* The ActionListener will be called on the calling thread or the generic thread pool.
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueId the id of the extension
* @param listener the action listener to notify
*/
public void connectToNodeAsExtension(
final DiscoveryNode node,
ConnectionProfile connectionProfile,
String extensionUniqueId,
ActionListener<Void> listener
) {
if (isLocalNode(node)) {
listener.onResponse(null);
return;
}
connectionManager.connectToNode(
node,
connectionProfile,
connectionValidatorForExtensionConnectingToNode(node, extensionUniqueId),
listener
);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Void> listener) {
if (isLocalNode(node)) {
listener.onResponse(null);
Expand All @@ -470,6 +519,25 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n
};
}

public ConnectionManager.ConnectionValidator connectionValidatorForExtensionConnectingToNode(
DiscoveryNode node,
String extensionUniqueId
) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
threadPool.getThreadContext().putHeader("extension_unique_id", extensionUniqueId);
handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> {
final DiscoveryNode remote = resp.discoveryNode;

if (node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}

return null;
}));
};
}

public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
Expand Down

0 comments on commit b752555

Please sign in to comment.