Skip to content

Commit

Permalink
Revert "Hot-reloadable remote cluster credentials (elastic#102798)" (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
n1v0lg authored Dec 8, 2023
1 parent ff1e2df commit 9db4cb4
Show file tree
Hide file tree
Showing 25 changed files with 260 additions and 1,054 deletions.
5 changes: 0 additions & 5 deletions docs/changelog/102798.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
newConnection,
clusterAlias,
connectionManager.getCredentialsManager()
actualProfile.getTransportProfile()
),
actualProfile.getHandshakeTimeout(),
cn -> true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,15 @@ final class RemoteClusterConnection implements Closeable {
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param transportService the local nodes transport service
* @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential,
* i.e. it has a credential configured via secure setting.
* This means the remote cluster uses the advances RCS model (as opposed to the basic model).
* @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured
* via secure setting. This means the remote cluster uses the new configurable access RCS model
* (as opposed to the basic model).
*/
RemoteClusterConnection(
Settings settings,
String clusterAlias,
TransportService transportService,
RemoteClusterCredentialsManager credentialsManager
) {
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
clusterAlias,
settings,
credentialsManager.hasCredentials(clusterAlias)
);
this.remoteConnectionManager = new RemoteConnectionManager(
clusterAlias,
credentialsManager,
createConnectionManager(profile, transportService)
);
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected);
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
this.remoteConnectionManager.addListener(transportService);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ public boolean isRemoteClusterServerEnabled() {

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
private final Set<String> credentialsProtectedRemoteClusters;

RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
this.transportService = transportService;
this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet();

if (remoteClusterServerEnabled) {
registerRemoteClusterHandshakeRequestHandler(transportService);
}
Expand Down Expand Up @@ -304,14 +305,6 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski
}
}

public void updateRemoteClusterCredentials(Settings settings) {
remoteClusterCredentialsManager.updateClusterCredentials(settings);
}

public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
return remoteClusterCredentialsManager;
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -370,7 +363,12 @@ synchronized void updateRemoteCluster(
if (remote == null) {
// this is a new cluster we have to add a new representation
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
} else if (remote.shouldRebuildConnection(newSettings)) {
Expand All @@ -382,7 +380,12 @@ synchronized void updateRemoteCluster(
}
remoteClusters.remove(clusterAlias);
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand All @@ -26,19 +25,18 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;

public class RemoteConnectionManager implements ConnectionManager {

private final String clusterAlias;
private final RemoteClusterCredentialsManager credentialsManager;
private final ConnectionManager delegate;
private final AtomicLong counter = new AtomicLong();
private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList();

RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) {
RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
this.clusterAlias = clusterAlias;
this.credentialsManager = credentialsManager;
this.delegate = delegate;
this.delegate.addListener(new TransportConnectionListener() {
@Override
Expand All @@ -53,10 +51,6 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

public RemoteClusterCredentialsManager getCredentialsManager() {
return credentialsManager;
}

/**
* Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode}
* instead of this method.
Expand Down Expand Up @@ -101,7 +95,13 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi
node,
profile,
listener.delegateFailureAndWrap(
(l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager))
(l, connection) -> l.onResponse(
new InternalRemoteConnection(
connection,
clusterAlias,
profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile()
)
)
)
);
}
Expand Down Expand Up @@ -182,35 +182,16 @@ public void closeNoBlock() {
* @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result
*/
public static Optional<String> resolveRemoteClusterAlias(Transport.Connection connection) {
return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias);
}

public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) {
@Override
public String toString() {
return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
}
}

/**
* This method returns information (alias and credentials) for remote cluster for the given transport connection.
* Either or both of alias and credentials can be null depending on the connection.
*
* @param connection the transport connection for which to resolve a remote cluster alias
*/
public static Optional<RemoteClusterAliasWithCredentials> resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) {
Transport.Connection unwrapped = TransportService.unwrapConnection(connection);
if (unwrapped instanceof InternalRemoteConnection remoteConnection) {
return Optional.of(
new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials())
);
return Optional.of(remoteConnection.getClusterAlias());
}
return Optional.empty();
}

private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException {
Transport.Connection connection = delegate.getConnection(node);
return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager);
return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile());
}

private synchronized void addConnectedNode(DiscoveryNode addedNode) {
Expand Down Expand Up @@ -316,27 +297,21 @@ private static final class InternalRemoteConnection implements Transport.Connect
private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class);
private final Transport.Connection connection;
private final String clusterAlias;
@Nullable
private final SecureString clusterCredentials;
private final boolean isRemoteClusterProfile;

private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) {
InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) {
assert false == connection instanceof InternalRemoteConnection : "should not double wrap";
assert false == connection instanceof ProxyConnection
: "proxy connection should wrap internal remote connection, not the other way around";
this.connection = Objects.requireNonNull(connection);
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.clusterCredentials = clusterCredentials;
this.connection = Objects.requireNonNull(connection);
this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile));
}

public String getClusterAlias() {
return clusterAlias;
}

@Nullable
public SecureString getClusterCredentials() {
return clusterCredentials;
}

@Override
public DiscoveryNode getNode() {
return connection.getNode();
Expand All @@ -346,7 +321,7 @@ public DiscoveryNode getNode() {
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
final String effectiveAction;
if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias);
effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
} else {
Expand Down Expand Up @@ -414,8 +389,8 @@ public boolean hasReferences() {
static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo(
Transport.Connection connection,
String clusterAlias,
RemoteClusterCredentialsManager credentialsManager
String transportProfile
) {
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
return new InternalRemoteConnection(connection, clusterAlias, transportProfile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,7 @@ private ConnectionManager.ConnectionValidator getConnectionValidator(DiscoveryNo
: "transport profile must be consistent between the connection manager and the actual profile";
transportService.connectionValidator(node)
.validate(
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
connection,
clusterAlias,
connectionManager.getCredentialsManager()
),
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, profile.getTransportProfile()),
profile,
listener
);
Expand Down
Loading

0 comments on commit 9db4cb4

Please sign in to comment.