diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index c1d5c0949e07e..01b02db20fb24 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -38,7 +38,6 @@ import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -54,7 +53,6 @@ import java.util.Set; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -82,14 +80,12 @@ public class CoordinationState { private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; private final boolean isRemoteStateEnabled; - private boolean isRemotePublicationEnabled; public CoordinationState( DiscoveryNode localNode, PersistedStateRegistry persistedStateRegistry, ElectionStrategy electionStrategy, - Settings settings, - ClusterSettings clusterSettings + Settings settings ) { this.localNode = localNode; @@ -107,14 +103,6 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); - this.isRemotePublicationEnabled = isRemoteStateEnabled - && REMOTE_PUBLICATION_SETTING.get(settings) - && localNode.isRemoteStatePublicationConfigured(); - clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); - } - - public boolean isRemotePublicationEnabled() { - return isRemotePublicationEnabled; } public long getCurrentTerm() { @@ -653,15 +641,6 @@ private boolean shouldCommitRemotePersistedState() { && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; } - private void setRemotePublicationSetting(boolean remotePublicationSetting) { - if (remotePublicationSetting == false) { - this.isRemotePublicationEnabled = false; - } else { - this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured(); - } - - } - /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index a56b8f5bdece3..2ea251decd056 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -872,9 +872,7 @@ boolean publicationInProgress() { @Override protected void doStart() { synchronized (mutex) { - coordinationState.set( - new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings) - ); + coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings)); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); @@ -1363,7 +1361,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext( clusterChangedEvent, - coordinationState.get().isRemotePublicationEnabled(), + this.isRemotePublicationEnabled(), persistedStateRegistry ); logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString()); @@ -1892,8 +1890,8 @@ public static boolean isZen1Node(DiscoveryNode discoveryNode) { } public boolean isRemotePublicationEnabled() { - if (coordinationState.get() != null) { - return coordinationState.get().isRemotePublicationEnabled(); + if (remoteClusterStateService != null) { + return remoteClusterStateService.isRemotePublicationEnabled(); } return false; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 849a2f5b43b09..abb5d826315ed 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -83,6 +83,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; @@ -234,7 +235,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; - private boolean isPublicationEnabled; + private AtomicBoolean isPublicationEnabled; private final String remotePathPrefix; private final RemoteClusterStateCache remoteClusterStateCache; @@ -275,9 +276,11 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING) - && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) - && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + this.isPublicationEnabled = new AtomicBoolean( + clusterSettings.get(REMOTE_PUBLICATION_SETTING) + && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) + && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings) + ); clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( @@ -306,19 +309,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat return null; } + boolean publicationEnabled = isPublicationEnabled.get(); UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel( clusterState, new ArrayList<>(clusterState.metadata().indices().values()), emptyMap(), - RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled), + RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), publicationEnabled), true, true, true, - isPublicationEnabled, - isPublicationEnabled, - isPublicationEnabled, - isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(), - isPublicationEnabled, + publicationEnabled, + publicationEnabled, + publicationEnabled, + publicationEnabled ? clusterState.customs() : Collections.emptyMap(), + publicationEnabled, remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()), null ); @@ -397,9 +401,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); final DiffableUtils.MapDiff> customsDiff = remoteGlobalMetadataManager - .getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled); + .getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled.get()); final DiffableUtils.MapDiff> clusterStateCustomsDiff = - remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false); + remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled.get(), false); final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); final Map allUploadedClusterStateCustomsMap = new HashMap<>( previousManifest.getClusterStateCustomMap() @@ -464,10 +468,10 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; - final boolean updateDiscoveryNodes = isPublicationEnabled + final boolean updateDiscoveryNodes = isPublicationEnabled.get() && clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges(); - final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks()); - final boolean updateHashesOfConsistentSettings = isPublicationEnabled + final boolean updateClusterBlocks = isPublicationEnabled.get() && !clusterState.blocks().equals(previousClusterState.blocks()); + final boolean updateHashesOfConsistentSettings = isPublicationEnabled.get() && Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false; uploadedMetadataResults = writeMetadataInParallel( @@ -1120,9 +1124,9 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl private void setRemotePublicationSetting(boolean remotePublicationSetting) { if (remotePublicationSetting == false) { - this.isPublicationEnabled = false; + this.isPublicationEnabled.set(false); } else { - this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); + this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings)); } } @@ -1841,7 +1845,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } public boolean isRemotePublicationEnabled() { - return this.isPublicationEnabled; + return this.isPublicationEnabled.get(); } public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index a77a94723d966..b5d16e7be849f 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -68,7 +68,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; @@ -1268,22 +1267,12 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRe verifyNoInteractions(remoteClusterStateService); } - public void testIsRemotePublicationEnabled_WithInconsistentSettings() { - // create settings with remote state disabled but publication enabled - Settings settings = Settings.builder() - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false) - .put(REMOTE_PUBLICATION_SETTING_KEY, true) - .build(); - CoordinationState coordinationState = createCoordinationState(psr1, node1, settings); - assertFalse(coordinationState.isRemotePublicationEnabled()); - } - public static CoordinationState createCoordinationState( PersistedStateRegistry persistedStateRegistry, DiscoveryNode localNode, Settings settings ) { - return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null); + return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings); } public static ClusterState clusterState( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java index 1852be6310f05..5ddf614db3334 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java @@ -302,8 +302,7 @@ public void testPrevotingIndicatesElectionSuccess() { localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, - Settings.EMPTY, - null + Settings.EMPTY ); final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java index a2c54dcc88efb..4d18ff95887dd 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java @@ -94,13 +94,7 @@ class MockNode { ); PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState)); - coordinationState = new CoordinationState( - localNode, - persistedStateRegistry, - ElectionStrategy.DEFAULT_INSTANCE, - Settings.EMPTY, - null - ); + coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY); } final DiscoveryNode localNode; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 35a8ae16cacf7..c21f81af479ad 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -370,6 +370,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException // TODO Make the publication flag parameterized publicationEnabled = true; settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, publicationEnabled).build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, @@ -388,6 +390,7 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException ), writableRegistry() ); + assertTrue(remoteClusterStateService.isRemotePublicationEnabled()); final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()) .customs( Map.of( @@ -747,6 +750,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException { publicationEnabled = true; settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, true).build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, @@ -765,6 +770,7 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I ), writableRegistry() ); + assertTrue(remoteClusterStateService.isRemotePublicationEnabled()); final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java index 2215ecafc16c9..cbe695cbb2136 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java @@ -150,7 +150,7 @@ static class ClusterNode { persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); this.electionStrategy = electionStrategy; - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); } void reboot() { @@ -189,7 +189,7 @@ void reboot() { localNode.getVersion() ); - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); } void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {