From 70947b5018cdf9f946613f69ff4f72f64b0026f7 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 25 Sep 2024 20:39:36 +0530 Subject: [PATCH] Separate Remote State and Publication enabled and configured methods Signed-off-by: Shivansh Arora --- .../RemotePublicationConfigurationIT.java | 21 ++++++++++ .../remote/RemoteStatePublicationIT.java | 6 ++- .../coordination/CoordinationState.java | 2 +- .../cluster/coordination/Coordinator.java | 8 ++-- .../coordination/JoinTaskExecutor.java | 4 +- .../PublicationTransportHandler.java | 14 ++----- .../cluster/node/DiscoveryNode.java | 4 +- .../remote/RemoteClusterStateService.java | 6 +-- .../index/remote/RemoteIndexPathUploader.java | 4 +- .../main/java/org/opensearch/node/Node.java | 4 +- .../remotestore/RemoteStoreNodeAttribute.java | 5 +-- .../test/OpenSearchIntegTestCase.java | 39 ++++++++++++------- 12 files changed, 73 insertions(+), 44 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java index 1d19a4bfd1af5..0cbb0b87f28a7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote; +import org.junit.Assert; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.common.settings.Settings; import org.opensearch.plugins.Plugin; @@ -118,6 +119,26 @@ public Settings.Builder remoteWithRoutingTableNodeSetting() { .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true); } + public void testRemoteClusterStateServiceNotInitialized_WhenNodeAttributesNotPresent() { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNodes(2); + + ensureStableCluster(3); + ensureGreen(); + + internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNull); + } + + public void testServiceInitialized_WhenNodeAttributesPresent() { + internalCluster().startClusterManagerOnlyNode(buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE)); + internalCluster().startDataOnlyNodes(2, buildRemoteStateNodeAttributes(REPOSITORY_NAME, segmentRepoPath, ReloadableFsRepository.TYPE)); + + ensureStableCluster(3); + ensureGreen(); + + internalCluster().getDataOrClusterManagerNodeInstances(RemoteClusterStateService.class).forEach(Assert::assertNotNull); + } + public void testRemotePublishConfigNodeJoinNonRemoteCluster() throws Exception { internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNodes(2); diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 578c922c80a0d..2409b5d0d0e45 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -215,7 +215,11 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() { ensureStableCluster(5); ensureGreen(INDEX_NAME); - assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class)); + RemoteClusterStateService remoteClusterStateService = internalCluster().getCurrentClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + + assertFalse(remoteClusterStateService.isRemotePublicationEnabled()); } public void testRemotePublicationDownloadStats() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9cffc7051d756..e5b07f867d609 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -108,7 +108,7 @@ public CoordinationState( // ToDo: revisit this check while making the setting dynamic this.isRemotePublicationEnabled = isRemoteStateEnabled && REMOTE_PUBLICATION_SETTING.get(settings) - && localNode.isRemoteStatePublicationEnabled(); + && localNode.isRemoteStatePublicationConfigured(); } public boolean isRemotePublicationEnabled() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 9859abe503eaa..1b3ae89251ac0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -189,6 +189,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final PersistedStateRegistry persistedStateRegistry; private final RemoteStoreNodeService remoteStoreNodeService; private NodeConnectionsService nodeConnectionsService; + private final RemoteClusterStateService remoteClusterStateService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -312,6 +313,7 @@ public Coordinator( this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; this.remoteStoreNodeService = remoteStoreNodeService; + this.remoteClusterStateService = remoteClusterStateService; } private ClusterFormationState getClusterFormationState() { @@ -912,9 +914,9 @@ public DiscoveryStats stats() { stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); } }); - if (coordinationState.get().isRemotePublicationEnabled()) { - stats.add(publicationHandler.getFullDownloadStats()); - stats.add(publicationHandler.getDiffDownloadStats()); + if (remoteClusterStateService != null) { + stats.add(remoteClusterStateService.getFullDownloadStats()); + stats.add(remoteClusterStateService.getDiffDownloadStats()); } clusterStateStats.setPersistenceStats(stats); return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 13033b670d44b..aca8653be4dd8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -513,10 +513,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi assert existingNodes.isEmpty() == false; Optional remotePublicationNode = existingNodes.stream() - .filter(DiscoveryNode::isRemoteStatePublicationEnabled) + .filter(DiscoveryNode::isRemoteStatePublicationConfigured) .findFirst(); - if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) { + if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) { ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES); } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index caed2b6eceb49..6277b70f9a7c8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() { ); } - public PersistedStateStats getFullDownloadStats() { - return remoteClusterStateService.getFullDownloadStats(); - } - - public PersistedStateStats getDiffDownloadStats() { - return remoteClusterStateService.getDiffDownloadStats(); - } - private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) { ClusterState incomingState; @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext( ) { if (isRemotePublicationEnabled == true) { if (allNodesRemotePublicationEnabled.get() == false) { - if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) { + if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) { allNodesRemotePublicationEnabled.set(true); } } @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext( return publicationContext; } - private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) { + private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) { assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0; for (DiscoveryNode node : discoveryNodes.getNodes().values()) { // if a node is non-remote then created local publication context - if (node.isRemoteStatePublicationEnabled() == false) { + if (node.isRemoteStatePublicationConfigured() == false) { return false; } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index a6f0a457f7f9b..b06c2ef0bdbe4 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -515,10 +515,10 @@ public boolean isRemoteStoreNode() { } /** - * Returns whether remote cluster state publication is enabled on this node + * Returns whether settings required for remote cluster state publication is configured * @return true if the node contains remote cluster state node attribute and remote routing table node attribute */ - public boolean isRemoteStatePublicationEnabled() { + public boolean isRemoteStatePublicationConfigured() { return this.getAttributes() .keySet() .stream() diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ece29180f9cf5..be56c70b30332 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -111,7 +111,7 @@ import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured; /** * A Service which provides APIs to upload and download cluster metadata from remote store. @@ -256,7 +256,7 @@ public RemoteClusterStateService( List indexMetadataUploadListeners, NamedWriteableRegistry namedWriteableRegistry ) { - assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; + assert isRemoteClusterStateConfigured(settings) : "Remote cluster state is not configured"; this.nodeId = nodeId; this.repositoriesService = repositoriesService; this.settings = settings; @@ -1055,7 +1055,7 @@ public void close() throws IOException { } public void start() { - assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled"; + assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not enabled"; final String remoteStoreRepo = settings.get( Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY ); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 5878dff03acc2..2a76a5b966884 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -51,8 +51,8 @@ import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** * Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory} @@ -235,7 +235,7 @@ private Repository validateAndGetRepository(String repoSetting) { } public void start() { - assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled"; + assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not configured"; if (isRemoteDataAttributePresent == false) { // If remote store data attributes are not present than we skip this. return; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4962d72d8728a..0b99474687077 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -311,9 +311,9 @@ import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** * A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used @@ -796,7 +796,7 @@ protected Node( final RemoteClusterStateService remoteClusterStateService; final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; final RemoteIndexPathUploader remoteIndexPathUploader; - if (isRemoteStoreClusterStateEnabled(settings)) { + if (isRemoteClusterStateConfigured(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( threadPool, settings, diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 55971398634c5..d6a58f8e1d471 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -180,7 +180,7 @@ public static boolean isRemoteDataAttributePresent(Settings settings) { || settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY).isEmpty() == false; } - public static boolean isRemoteClusterStateAttributePresent(Settings settings) { + public static boolean isRemoteClusterStateConfigured(Settings settings) { return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY) .isEmpty() == false; } @@ -194,8 +194,7 @@ public static String getRemoteStoreTranslogRepo(Settings settings) { } public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { - return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) - && isRemoteClusterStateAttributePresent(settings); + return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteClusterStateConfigured(settings); } private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 68a2b8086a92e..b70301e7ce541 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2795,6 +2795,30 @@ public static Settings buildRemoteStoreNodeAttributes( ); } + public static Settings buildRemoteStateNodeAttributes( + String stateRepoName, + Path stateRepoPath, + String stateRepoType + ) { + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + stateRepoName + ); + String stateRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + stateRepoName + ); + String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey(); + Settings.Builder settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, stateRepoName) + .put(stateRepoTypeAttributeKey, stateRepoType) + .put(stateRepoSettingsAttributeKeyPrefix + "location", stateRepoPath) + .put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable); + return settings.build(); + } + private static Settings buildRemoteStoreNodeAttributes( String segmentRepoName, Path segmentRepoPath, @@ -2850,16 +2874,6 @@ private static Settings buildRemoteStoreNodeAttributes( "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, translogRepoName ); - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - segmentRepoName - ); - String stateRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - segmentRepoName - ); String routingTableRepoAttributeKey = null, routingTableRepoSettingsAttributeKeyPrefix = null; if (routingTableRepoName != null) { routingTableRepoAttributeKey = String.format( @@ -2885,10 +2899,7 @@ private static Settings buildRemoteStoreNodeAttributes( .put(translogRepoTypeAttributeKey, translogRepoType) .put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath) .put(translogRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable) - .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName) - .put(stateRepoTypeAttributeKey, segmentRepoType) - .put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath) - .put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable); + .put(buildRemoteStateNodeAttributes(segmentRepoName, segmentRepoPath, segmentRepoType)); if (routingTableRepoName != null) { settings.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName) .put(routingTableRepoAttributeKey, routingTableRepoType)