From 68c358ca2688e54ef92d69dec3fb18da8d9e4a2f Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 16 Sep 2024 10:15:18 +0530 Subject: [PATCH] Make Remote Publication a dynamic setting Signed-off-by: Shivansh Arora --- .../coordination/CoordinationState.java | 19 +++++++++++++++---- .../cluster/coordination/Coordinator.java | 12 ++++++++---- .../coordination/JoinTaskExecutor.java | 4 ++-- .../PublicationTransportHandler.java | 14 +++----------- .../cluster/node/DiscoveryNode.java | 2 +- .../remote/RemoteClusterStateService.java | 16 +++++++++++++--- .../coordination/CoordinationStateTests.java | 2 +- .../coordination/PreVoteCollectorTests.java | 4 ++-- .../coordination/PublicationTests.java | 2 +- .../CoordinationStateTestCluster.java | 4 ++-- 10 files changed, 48 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9cffc7051d756..4104e480bad54 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -54,6 +55,7 @@ import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -87,8 +89,8 @@ public CoordinationState( DiscoveryNode localNode, PersistedStateRegistry persistedStateRegistry, ElectionStrategy electionStrategy, - Settings settings - ) { + Settings settings, + ClusterSettings clusterSettings) { this.localNode = localNode; // persisted state registry @@ -105,10 +107,10 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); - // ToDo: revisit this check while making the setting dynamic this.isRemotePublicationEnabled = isRemoteStateEnabled && REMOTE_PUBLICATION_SETTING.get(settings) - && localNode.isRemoteStatePublicationEnabled(); + && localNode.isRemoteStatePublicationConfigured(); + clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); } public boolean isRemotePublicationEnabled() { @@ -651,6 +653,15 @@ private boolean shouldCommitRemotePersistedState() { && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; } + private void setRemotePublicationSetting(boolean remotePublicationSetting) { + if (remotePublicationSetting == false) { + this.isRemotePublicationEnabled = false; + } else { + this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured(); + } + + } + /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 13a57d93f03f0..547c7b7b40e9e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -186,7 +186,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; private final PersistedStateRegistry persistedStateRegistry; + private final RemoteClusterStateService remoteClusterStateService; private final RemoteStoreNodeService remoteStoreNodeService; + private final ClusterSettings clusterSettings; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -310,6 +312,8 @@ public Coordinator( this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; this.remoteStoreNodeService = remoteStoreNodeService; + this.remoteClusterStateService = remoteClusterStateService; + this.clusterSettings = clusterSettings; } private ClusterFormationState getClusterFormationState() { @@ -858,7 +862,7 @@ boolean publicationInProgress() { @Override protected void doStart() { synchronized (mutex) { - coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings)); + coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings)); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); @@ -903,9 +907,9 @@ public DiscoveryStats stats() { stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); } }); - if (coordinationState.get().isRemotePublicationEnabled()) { - stats.add(publicationHandler.getFullDownloadStats()); - stats.add(publicationHandler.getDiffDownloadStats()); + if (remoteClusterStateService != null) { + stats.add(remoteClusterStateService.getFullDownloadStats()); + stats.add(remoteClusterStateService.getDiffDownloadStats()); } clusterStateStats.setPersistenceStats(stats); return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 838b5723b217b..f725b3d2cefd8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -508,10 +508,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi assert existingNodes.isEmpty() == false; Optional remotePublicationNode = existingNodes.stream() - .filter(DiscoveryNode::isRemoteStatePublicationEnabled) + .filter(DiscoveryNode::isRemoteStatePublicationConfigured) .findFirst(); - if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) { + if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) { ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES); } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index cdf331b7bb577..6ecb39614f7a5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() { ); } - public PersistedStateStats getFullDownloadStats() { - return remoteClusterStateService.getFullDownloadStats(); - } - - public PersistedStateStats getDiffDownloadStats() { - return remoteClusterStateService.getDiffDownloadStats(); - } - private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) { ClusterState incomingState; @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext( ) { if (isRemotePublicationEnabled == true) { if (allNodesRemotePublicationEnabled.get() == false) { - if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) { + if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) { allNodesRemotePublicationEnabled.set(true); } } @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext( return publicationContext; } - private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) { + private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) { assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0; for (DiscoveryNode node : discoveryNodes.getNodes().values()) { // if a node is non-remote then created local publication context - if (node.isRemoteStatePublicationEnabled() == false) { + if (node.isRemoteStatePublicationConfigured() == false) { return false; } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index a6f0a457f7f9b..5f5783425a08c 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -518,7 +518,7 @@ public boolean isRemoteStoreNode() { * Returns whether remote cluster state publication is enabled on this node * @return true if the node contains remote cluster state node attribute and remote routing table node attribute */ - public boolean isRemoteStatePublicationEnabled() { + public boolean isRemoteStatePublicationConfigured() { return this.getAttributes() .keySet() .stream() diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 12d10fd908b44..9cb92121b84d7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -111,6 +111,7 @@ import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -132,7 +133,7 @@ public class RemoteClusterStateService implements Closeable { REMOTE_PUBLICATION_SETTING_KEY, false, Property.NodeScope, - Property.Final + Property.Dynamic ); /** @@ -232,7 +233,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; - private final boolean isPublicationEnabled; + private boolean isPublicationEnabled; private final String remotePathPrefix; private final RemoteClusterStateCache remoteClusterStateCache; @@ -273,9 +274,10 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings) + this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( repositoriesService, @@ -1109,6 +1111,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; } + private void setRemotePublicationSetting(boolean remotePublicationSetting) { + if (remotePublicationSetting == false) { + this.isPublicationEnabled = false; + } else { + this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); + } + } + // Package private for unit test RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 32cb95e0c04f6..a77a94723d966 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -1283,7 +1283,7 @@ public static CoordinationState createCoordinationState( DiscoveryNode localNode, Settings settings ) { - return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings); + return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null); } public static ClusterState clusterState( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java index 5ddf614db3334..341e010b02698 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java @@ -302,8 +302,8 @@ public void testPrevotingIndicatesElectionSuccess() { localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, - Settings.EMPTY - ); + Settings.EMPTY, + null); final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java index 4d18ff95887dd..504ab16f1c068 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java @@ -94,7 +94,7 @@ class MockNode { ); PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState)); - coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY); + coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY, null); } final DiscoveryNode localNode; diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java index cbe695cbb2136..2215ecafc16c9 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java @@ -150,7 +150,7 @@ static class ClusterNode { persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); this.electionStrategy = electionStrategy; - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); } void reboot() { @@ -189,7 +189,7 @@ void reboot() { localNode.getVersion() ); - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); } void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {