diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java index 2f6ea600d895f..21a395ad4c980 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java @@ -44,6 +44,8 @@ import org.opensearch.common.Randomness; import org.opensearch.common.settings.Settings; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; @@ -57,6 +59,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; @@ -261,7 +264,9 @@ public void testNodeNotReachableFromClusterManager() throws Exception { } /** - * Test Repositories Configured Node Join Commit failures. + * Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a + * repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes + * doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish. */ public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception { final String remoteStateRepoName = "remote-state-repo"; @@ -288,20 +293,32 @@ public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitF clusterManagerNode ); logger.info("Blocking Cluster Manager Commit Request on all nodes"); + // This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the + // nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins nonClusterManagerNodes.forEach(node -> { TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node); - clusterManagerTransportService.addOpenSearchFailureException( - targetTransportService, - new FailedToCommitClusterStateException("Blocking Commit"), - PublicationTransportHandler.COMMIT_STATE_ACTION_NAME - ); + clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> { + if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) { + logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME); + throw new FailedToCommitClusterStateException("Blocking Commit"); + } + connection.sendRequest(requestId, action, request, options); + }); }); logger.info("Starting Node with remote publication settings"); + // Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create + // a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data. internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE); logger.info("Stopping current Cluster Manager"); + // We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes + // we had the new cluster-state published but not commited. internalCluster().stopCurrentClusterManagerNode(); + + // We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The + // repositories validations are skipped because even though the cluster-state is updated in the persisted registry, + // the repository service will not be updated as the commit attempt failed. ensureStableCluster(6); String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size())); @@ -330,11 +347,22 @@ public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitF RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode); - if (repositoriesService.isRepositoryPresent(remoteStateRepoName)) { - isRemoteStateRepoConfigured = Boolean.TRUE; + try { + Repository remoteStateRepo = repositoriesService.repository(remoteStateRepoName); + if (Objects.nonNull(remoteStateRepo)) { + isRemoteStateRepoConfigured = Boolean.TRUE; + } + } catch (RepositoryMissingException e) { + isRemoteStateRepoConfigured = Boolean.FALSE; } - if (repositoriesService.isRepositoryPresent(remoteRoutingTableRepoName)) { - isRemoteRoutingTableRepoConfigured = Boolean.TRUE; + + try { + Repository routingTableRepo = repositoriesService.repository(remoteRoutingTableRepoName); + if (Objects.nonNull(routingTableRepo)) { + isRemoteRoutingTableRepoConfigured = Boolean.TRUE; + } + } catch (RepositoryMissingException e) { + isRemoteRoutingTableRepoConfigured = Boolean.FALSE; } Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index 10441a74e174c..da94c8a7836b3 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -21,6 +21,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryException; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -183,13 +184,17 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode boolean repositoryAlreadyPresent = false; for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) { if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) { - // This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded but - // the commit operation failed, the cluster-state may have the repository metadata which is not applied into the - // repository service. This may lead to assertion failures down the line. - if (!repositoriesService.get().isRepositoryPresent(newRepositoryMetadata.name())) { + try { + // This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded + // but + // the commit operation failed, the cluster-state may have the repository metadata which is not applied into the + // repository service. This may lead to assertion failures down the line. + String repositoryName = newRepositoryMetadata.name(); + repositoriesService.get().repository(repositoryName); + } catch (RepositoryMissingException e) { logger.warn( - "remote repository [{}] in cluster-state but repository-service but not present " - + "in repository-service, skipping checks", + "Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present " + + "in the repository service.", newRepositoryMetadata.name() ); break; diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 0b9536e273f25..49065be0abb25 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -579,10 +579,6 @@ public Repository repository(String repositoryName) { throw new RepositoryMissingException(repositoryName); } - public Boolean isRepositoryPresent(final String repositoryName) { - return Objects.nonNull(repositories.get(repositoryName)); - } - public List repositoriesStats() { List activeRepoStats = getRepositoryStatsForActiveRepositories(); return activeRepoStats; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index f6fb203bfe1a9..9590e5615d451 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -55,6 +55,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -1378,6 +1379,72 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() { JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); } + public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception { + Map newNodeAttributes = new HashMap<>(); + newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO)); + newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO)); + + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + RepositoriesService repositoriesService = mock(RepositoriesService.class); + when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + newNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO); + final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO); + List repositoriesMetadata = new ArrayList<>() { + { + add(clusterStateRepo); + add(routingTableRepo); + } + }; + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata))) + .build(); + + final DiscoveryNode joiningNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + newNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(joiningNode, "test")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode); + + } + private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories) throws Exception { diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index d8146e6f7d540..6bf5381b62cc9 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -34,7 +34,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.node.DiscoveryNode; @@ -377,47 +376,6 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final }); } - /** - * Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions - */ - public void addOpenSearchFailureException( - TransportService transportService, - final OpenSearchException exception, - final String... blockedActions - ) { - addOpenSearchFailureException(transportService, exception, new HashSet<>(Arrays.asList(blockedActions))); - } - - /** - * Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions - */ - public void addOpenSearchFailureException( - TransportService transportService, - OpenSearchException exception, - final Set blockedActions - ) { - for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { - addOpenSearchFailureException(transportAddress, exception, blockedActions); - } - } - - /** - * Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions - */ - public void addOpenSearchFailureException( - TransportAddress transportAddress, - OpenSearchException exception, - final Set blockedActions - ) { - transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> { - if (blockedActions.contains(action)) { - logger.info("--> preventing {} request", action); - throw exception; - } - connection.sendRequest(requestId, action, request, options); - }); - } - /** * Adds a rule that will cause ignores each send request, simulating an unresponsive node * and failing to connect once the rule was added.