Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state #16820

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix stale cluster state custom file deletion ([#16670](https://github.com/opensearch-project/OpenSearch/pull/16670))
- Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702))
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,37 @@
package org.opensearch.discovery;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.coordination.JoinHelper;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PublicationTransportHandler;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Randomness;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.disruption.SlowClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
import org.junit.Assert;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
Expand Down Expand Up @@ -250,4 +264,142 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
ensureStableCluster(3);
}

/**
* Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a
* repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes
* doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish.
*/
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
final String remoteStateRepoName = "remote-state-repo";
final String remoteRoutingTableRepoName = "routing-table-repo";

Settings remotePublicationSettings = buildRemotePublicationNodeAttributes(
remoteStateRepoName,
ReloadableFsRepository.TYPE,
remoteRoutingTableRepoName,
ReloadableFsRepository.TYPE
);
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startDataOnlyNodes(3);

String clusterManagerNode = internalCluster().getClusterManagerName();
List<String> nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames())
.filter(node -> !node.equals(clusterManagerNode))
.collect(Collectors.toList());

ensureStableCluster(6);

MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);
logger.info("Blocking Cluster Manager Commit Request on all nodes");
// This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the
// nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins
nonClusterManagerNodes.forEach(node -> {
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> {
if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) {
logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME);
throw new FailedToCommitClusterStateException("Blocking Commit");
}
connection.sendRequest(requestId, action, request, options);
});
});

logger.info("Starting Node with remote publication settings");
// Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create
// a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data.
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);

// Checking if publish succeeded in the nodes before shutting down the blocked cluster-manager
assertBusy(() -> {
String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, randomNode);

ClusterState state = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL).getLastAcceptedState();
RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE);
Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

assertNotNull(repositoriesMetadata);
assertNotNull(repositoriesMetadata.repositories());

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}
// Asserting that the metadata is present in the persisted cluster-state
assertTrue(isRemoteStateRepoConfigured);
assertTrue(isRemoteRoutingTableRepoConfigured);

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);

// Asserting that the metadata is not present in the repository service.
Assert.assertFalse(isRemoteStateRepoConfigured);
Assert.assertFalse(isRemoteRoutingTableRepoConfigured);
});

logger.info("Stopping current Cluster Manager");
// We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes
// we had the new cluster-state published but not commited.
internalCluster().stopCurrentClusterManagerNode();

// We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The
// repositories validations are skipped because even though the cluster-state is updated in the persisted registry,
// the repository service will not be updated as the commit attempt failed.
ensureStableCluster(6);

String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));

// Checking if the final cluster-state is updated.
RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode)
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);

Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}

Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured);

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);

Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured);

logger.info("Stopping current Cluster Manager");
}

private Boolean isRepoPresentInRepositoryService(RepositoriesService repositoriesService, String repoName) {
try {
Repository remoteStateRepo = repositoriesService.repository(repoName);
if (Objects.nonNull(remoteStateRepo)) {
return Boolean.TRUE;
}
} catch (RepositoryMissingException e) {
return Boolean.FALSE;
}

return Boolean.FALSE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -183,6 +184,20 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
try {
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded
// but the commit operation failed, the cluster-state may have the repository metadata which is not applied
// into the repository service. This may lead to assertion failures down the line.
repositoriesService.get().repository(newRepositoryMetadata.name());
} catch (RepositoryMissingException e) {
logger.warn(
"Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present "
+ "in the repository service.",
newRepositoryMetadata.name()
);
break;
}

try {
// This will help in handling two scenarios -
// 1. When a fresh cluster is formed and a node tries to join the cluster, the repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -918,6 +919,12 @@
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings();

assert Objects.nonNull(repository) : String.format(
Locale.ROOT,
"repository [%s] not present in RepositoryService",
currentRepositoryMetadata.name()

Check warning on line 925 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L925

Added line #L925 was not covered by tests
);

List<String> restrictedSettings = repository.getRestrictedSystemRepositorySettings()
.stream()
.map(setting -> setting.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -1463,6 +1464,73 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception {
Map<String, String> newNodeAttributes = new HashMap<>();
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));

final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
RepositoriesService repositoriesService = mock(RepositoriesService.class);
when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
null,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO);
final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO);
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>() {
{
add(clusterStateRepo);
add(routingTableRepo);
}
};

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
.build();

final DiscoveryNode joiningNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);

}

private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2311,10 +2311,24 @@ public List<String> startNodes(int numOfNodes, Settings settings) {
return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public List<String> startNodes(int numOfNodes, Settings settings, Boolean waitForNodeJoin) {
return startNodes(waitForNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Settings... extraSettings) {
return startNodes(false, extraSettings);
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Boolean waitForNodeJoin, Settings... extraSettings) {
final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count());
final int defaultMinClusterManagerNodes;
if (autoManageClusterManagerNodes) {
Expand Down Expand Up @@ -2366,7 +2380,7 @@ public synchronized List<String> startNodes(Settings... extraSettings) {
nodes.add(nodeAndClient);
}
startAndPublishNodesAndClients(nodes);
if (autoManageClusterManagerNodes) {
if (autoManageClusterManagerNodes && !waitForNodeJoin) {
validateClusterFormed();
}
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
Expand Down Expand Up @@ -2411,6 +2425,10 @@ public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build());
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin);
}

public List<String> startSearchOnlyNodes(int numNodes) {
return startSearchOnlyNodes(numNodes, Settings.EMPTY);
}
Expand Down
Loading
Loading