From 60fbfeaf7e21329fd5ca023bd5e9254cc4ecb89f Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 25 Sep 2024 20:39:36 +0530 Subject: [PATCH] Separate Remote State and Publication enabled and configured methods Signed-off-by: Shivansh Arora --- .../cluster/coordination/CoordinationState.java | 2 +- .../cluster/coordination/Coordinator.java | 8 +++++--- .../cluster/coordination/JoinTaskExecutor.java | 4 ++-- .../coordination/PublicationTransportHandler.java | 14 +++----------- .../org/opensearch/cluster/node/DiscoveryNode.java | 4 ++-- .../gateway/remote/RemoteClusterStateService.java | 3 ++- .../index/remote/RemoteIndexPathUploader.java | 4 ++-- server/src/main/java/org/opensearch/node/Node.java | 4 ++-- 8 files changed, 19 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9cffc7051d756..e5b07f867d609 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -108,7 +108,7 @@ public CoordinationState( // ToDo: revisit this check while making the setting dynamic this.isRemotePublicationEnabled = isRemoteStateEnabled && REMOTE_PUBLICATION_SETTING.get(settings) - && localNode.isRemoteStatePublicationEnabled(); + && localNode.isRemoteStatePublicationConfigured(); } public boolean isRemotePublicationEnabled() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 13a57d93f03f0..cb10844b52211 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -187,6 +187,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final NodeHealthService nodeHealthService; private final PersistedStateRegistry persistedStateRegistry; private final RemoteStoreNodeService remoteStoreNodeService; + private final RemoteClusterStateService remoteClusterStateService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -310,6 +311,7 @@ public Coordinator( this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; this.remoteStoreNodeService = remoteStoreNodeService; + this.remoteClusterStateService = remoteClusterStateService; } private ClusterFormationState getClusterFormationState() { @@ -903,9 +905,9 @@ public DiscoveryStats stats() { stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); } }); - if (coordinationState.get().isRemotePublicationEnabled()) { - stats.add(publicationHandler.getFullDownloadStats()); - stats.add(publicationHandler.getDiffDownloadStats()); + if (remoteClusterStateService != null) { + stats.add(remoteClusterStateService.getFullDownloadStats()); + stats.add(remoteClusterStateService.getDiffDownloadStats()); } clusterStateStats.setPersistenceStats(stats); return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 838b5723b217b..f725b3d2cefd8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -508,10 +508,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi assert existingNodes.isEmpty() == false; Optional remotePublicationNode = existingNodes.stream() - .filter(DiscoveryNode::isRemoteStatePublicationEnabled) + .filter(DiscoveryNode::isRemoteStatePublicationConfigured) .findFirst(); - if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) { + if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) { ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES); } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index cdf331b7bb577..6ecb39614f7a5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() { ); } - public PersistedStateStats getFullDownloadStats() { - return remoteClusterStateService.getFullDownloadStats(); - } - - public PersistedStateStats getDiffDownloadStats() { - return remoteClusterStateService.getDiffDownloadStats(); - } - private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) { ClusterState incomingState; @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext( ) { if (isRemotePublicationEnabled == true) { if (allNodesRemotePublicationEnabled.get() == false) { - if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) { + if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) { allNodesRemotePublicationEnabled.set(true); } } @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext( return publicationContext; } - private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) { + private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) { assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0; for (DiscoveryNode node : discoveryNodes.getNodes().values()) { // if a node is non-remote then created local publication context - if (node.isRemoteStatePublicationEnabled() == false) { + if (node.isRemoteStatePublicationConfigured() == false) { return false; } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index a6f0a457f7f9b..b06c2ef0bdbe4 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -515,10 +515,10 @@ public boolean isRemoteStoreNode() { } /** - * Returns whether remote cluster state publication is enabled on this node + * Returns whether settings required for remote cluster state publication is configured * @return true if the node contains remote cluster state node attribute and remote routing table node attribute */ - public boolean isRemoteStatePublicationEnabled() { + public boolean isRemoteStatePublicationConfigured() { return this.getAttributes() .keySet() .stream() diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ece29180f9cf5..c7d3975ec6267 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -111,6 +111,7 @@ import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -256,7 +257,7 @@ public RemoteClusterStateService( List indexMetadataUploadListeners, NamedWriteableRegistry namedWriteableRegistry ) { - assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; + assert isRemoteClusterStateAttributePresent(settings) : "Remote cluster state is not enabled"; this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 5878dff03acc2..63c12fe82ed4d 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -51,8 +51,8 @@ import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** * Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory} @@ -235,7 +235,7 @@ private Repository validateAndGetRepository(String repoSetting) { } public void start() { - assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled"; + assert isRemoteClusterStateAttributePresent(settings) == true : "Remote cluster state is not configured"; if (isRemoteDataAttributePresent == false) { // If remote store data attributes are not present than we skip this. return; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a8d4ebcf23dab..c7873b7da8069 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -311,9 +311,9 @@ import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** * A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used @@ -796,7 +796,7 @@ protected Node( final RemoteClusterStateService remoteClusterStateService; final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; final RemoteIndexPathUploader remoteIndexPathUploader; - if (isRemoteStoreClusterStateEnabled(settings)) { + if (isRemoteClusterStateAttributePresent(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( threadPool, settings,