From 4509065ec8a5ed7f2967fbb1504a345c98d75985 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla <55992439+Pranshu-S@users.noreply.github.com> Date: Tue, 10 Dec 2024 10:35:56 +0530 Subject: [PATCH] Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state (#16763) * Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state Signed-off-by: Pranshu Shukla Signed-off-by: Mingshi Liu --- CHANGELOG.md | 1 + .../discovery/DiscoveryDisruptionIT.java | 152 ++++++++++++++++++ .../remotestore/RemoteStoreNodeService.java | 15 ++ .../repositories/RepositoriesService.java | 7 + .../coordination/JoinTaskExecutorTests.java | 67 ++++++++ .../opensearch/test/InternalTestCluster.java | 20 ++- .../test/OpenSearchIntegTestCase.java | 39 +++++ 7 files changed, 300 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 26aeb4fc4b02a..ab534991798a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tiered Caching] Fix bug in cache stats API ([#16560](https://github.com/opensearch-project/OpenSearch/pull/16560)) - Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702)) - Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644)) +- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java index 70124c8c46700..377f99cd8b791 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java @@ -33,12 +33,21 @@ package org.opensearch.discovery; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.coordination.JoinHelper; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.coordination.PublicationTransportHandler; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Randomness; import org.opensearch.common.settings.Settings; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; import org.opensearch.test.disruption.ServiceDisruptionScheme; @@ -46,10 +55,15 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; +import org.junit.Assert; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; @@ -250,4 +264,142 @@ public void testNodeNotReachableFromClusterManager() throws Exception { ensureStableCluster(3); } + /** + * Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a + * repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes + * doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish. + */ + public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception { + final String remoteStateRepoName = "remote-state-repo"; + final String remoteRoutingTableRepoName = "routing-table-repo"; + + Settings remotePublicationSettings = buildRemotePublicationNodeAttributes( + remoteStateRepoName, + ReloadableFsRepository.TYPE, + remoteRoutingTableRepoName, + ReloadableFsRepository.TYPE + ); + internalCluster().startClusterManagerOnlyNodes(3); + internalCluster().startDataOnlyNodes(3); + + String clusterManagerNode = internalCluster().getClusterManagerName(); + List nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames()) + .filter(node -> !node.equals(clusterManagerNode)) + .collect(Collectors.toList()); + + ensureStableCluster(6); + + MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + clusterManagerNode + ); + logger.info("Blocking Cluster Manager Commit Request on all nodes"); + // This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the + // nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins + nonClusterManagerNodes.forEach(node -> { + TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node); + clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> { + if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) { + logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME); + throw new FailedToCommitClusterStateException("Blocking Commit"); + } + connection.sendRequest(requestId, action, request, options); + }); + }); + + logger.info("Starting Node with remote publication settings"); + // Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create + // a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data. + internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE); + + // Checking if publish succeeded in the nodes before shutting down the blocked cluster-manager + assertBusy(() -> { + String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size())); + PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, randomNode); + + ClusterState state = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL).getLastAcceptedState(); + RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE); + Boolean isRemoteStateRepoConfigured = Boolean.FALSE; + Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE; + + assertNotNull(repositoriesMetadata); + assertNotNull(repositoriesMetadata.repositories()); + + for (RepositoryMetadata repo : repositoriesMetadata.repositories()) { + if (repo.name().equals(remoteStateRepoName)) { + isRemoteStateRepoConfigured = Boolean.TRUE; + } else if (repo.name().equals(remoteRoutingTableRepoName)) { + isRemoteRoutingTableRepoConfigured = Boolean.TRUE; + } + } + // Asserting that the metadata is present in the persisted cluster-state + assertTrue(isRemoteStateRepoConfigured); + assertTrue(isRemoteRoutingTableRepoConfigured); + + RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode); + + isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName); + isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName); + + // Asserting that the metadata is not present in the repository service. + Assert.assertFalse(isRemoteStateRepoConfigured); + Assert.assertFalse(isRemoteRoutingTableRepoConfigured); + }); + + logger.info("Stopping current Cluster Manager"); + // We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes + // we had the new cluster-state published but not commited. + internalCluster().stopCurrentClusterManagerNode(); + + // We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The + // repositories validations are skipped because even though the cluster-state is updated in the persisted registry, + // the repository service will not be updated as the commit attempt failed. + ensureStableCluster(6); + + String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size())); + + // Checking if the final cluster-state is updated. + RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode) + .state() + .metadata() + .custom(RepositoriesMetadata.TYPE); + + Boolean isRemoteStateRepoConfigured = Boolean.FALSE; + Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE; + + for (RepositoryMetadata repo : repositoriesMetadata.repositories()) { + if (repo.name().equals(remoteStateRepoName)) { + isRemoteStateRepoConfigured = Boolean.TRUE; + } else if (repo.name().equals(remoteRoutingTableRepoName)) { + isRemoteRoutingTableRepoConfigured = Boolean.TRUE; + } + } + + Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured); + Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured); + + RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode); + + isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName); + isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName); + + Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured); + Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured); + + logger.info("Stopping current Cluster Manager"); + } + + private Boolean isRepoPresentInRepositoryService(RepositoriesService repositoriesService, String repoName) { + try { + Repository remoteStateRepo = repositoriesService.repository(repoName); + if (Objects.nonNull(remoteStateRepo)) { + return Boolean.TRUE; + } + } catch (RepositoryMissingException e) { + return Boolean.FALSE; + } + + return Boolean.FALSE; + } + } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index c1c041ce01198..fb97cf40d90d6 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -21,6 +21,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryException; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -183,6 +184,20 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode boolean repositoryAlreadyPresent = false; for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) { if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) { + try { + // This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded + // but the commit operation failed, the cluster-state may have the repository metadata which is not applied + // into the repository service. This may lead to assertion failures down the line. + repositoriesService.get().repository(newRepositoryMetadata.name()); + } catch (RepositoryMissingException e) { + logger.warn( + "Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present " + + "in the repository service.", + newRepositoryMetadata.name() + ); + break; + } + try { // This will help in handling two scenarios - // 1. When a fresh cluster is formed and a node tries to join the cluster, the repository diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 9aec81536dbd0..49065be0abb25 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -80,6 +80,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -904,6 +905,12 @@ public void ensureValidSystemRepositoryUpdate(RepositoryMetadata newRepositoryMe Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings(); Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings(); + assert Objects.nonNull(repository) : String.format( + Locale.ROOT, + "repository [%s] not present in RepositoryService", + currentRepositoryMetadata.name() + ); + List restrictedSettings = repository.getRestrictedSystemRepositorySettings() .stream() .map(setting -> setting.getKey()) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index f6fb203bfe1a9..9590e5615d451 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -55,6 +55,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -1378,6 +1379,72 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() { JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); } + public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception { + Map newNodeAttributes = new HashMap<>(); + newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO)); + newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO)); + + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + RepositoriesService repositoriesService = mock(RepositoriesService.class); + when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + newNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO); + final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO); + List repositoriesMetadata = new ArrayList<>() { + { + add(clusterStateRepo); + add(routingTableRepo); + } + }; + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata))) + .build(); + + final DiscoveryNode joiningNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + newNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(joiningNode, "test")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode); + + } + private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories) throws Exception { diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index fa5fb736f518f..7b2c653e9bdb2 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2322,10 +2322,24 @@ public List startNodes(int numOfNodes, Settings settings) { return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0])); } + /** + * Starts multiple nodes with the given settings and returns their names + */ + public List startNodes(int numOfNodes, Settings settings, Boolean waitForNodeJoin) { + return startNodes(waitForNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0])); + } + /** * Starts multiple nodes with the given settings and returns their names */ public synchronized List startNodes(Settings... extraSettings) { + return startNodes(false, extraSettings); + } + + /** + * Starts multiple nodes with the given settings and returns their names + */ + public synchronized List startNodes(Boolean waitForNodeJoin, Settings... extraSettings) { final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count()); final int defaultMinClusterManagerNodes; if (autoManageClusterManagerNodes) { @@ -2377,7 +2391,7 @@ public synchronized List startNodes(Settings... extraSettings) { nodes.add(nodeAndClient); } startAndPublishNodesAndClients(nodes); - if (autoManageClusterManagerNodes) { + if (autoManageClusterManagerNodes && !waitForNodeJoin) { validateClusterFormed(); } return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList()); @@ -2422,6 +2436,10 @@ public List startDataOnlyNodes(int numNodes, Settings settings) { return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build()); } + public List startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) { + return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin); + } + public List startSearchOnlyNodes(int numNodes) { return startSearchOnlyNodes(numNodes, Settings.EMPTY); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 1ee856d3092f0..1c26ea4ca2c91 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -214,6 +214,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import reactor.util.annotation.NonNull; + import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.common.unit.TimeValue.timeValueMillis; @@ -2915,6 +2917,43 @@ protected static Settings buildRemoteStoreNodeAttributes( return settings.build(); } + protected Settings buildRemotePublicationNodeAttributes( + @NonNull String remoteStateRepoName, + @NonNull String remoteStateRepoType, + @NonNull String routingTableRepoName, + @NonNull String routingTableRepoType + ) { + String remoteStateRepositoryTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + remoteStateRepoName + ); + String routingTableRepositoryTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + routingTableRepoName + ); + String remoteStateRepositorySettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + remoteStateRepoName + ); + String routingTableRepositorySettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + routingTableRepoName + ); + + return Settings.builder() + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, remoteStateRepoName) + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName) + .put(remoteStateRepositoryTypeAttributeKey, remoteStateRepoType) + .put(routingTableRepositoryTypeAttributeKey, routingTableRepoType) + .put(remoteStateRepositorySettingsAttributeKeyPrefix + "location", randomRepoPath().toAbsolutePath()) + .put(routingTableRepositorySettingsAttributeKeyPrefix + "location", randomRepoPath().toAbsolutePath()) + .build(); + } + public static String resolvePath(IndexId indexId, String shardId) { PathType pathType = PathType.fromCode(indexId.getShardPathType()); RemoteStorePathStrategy.SnapshotShardPathInput shardPathInput = new RemoteStorePathStrategy.SnapshotShardPathInput.Builder()