Skip to content

Commit

Permalink
Skip remote-repositories validations for node-joins when Repositories…
Browse files Browse the repository at this point in the history
…Service is not in sync with cluster-state

Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Dec 3, 2024
1 parent ad982c2 commit 5836719
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@

package org.opensearch.discovery;

import org.junit.Assert;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.coordination.JoinHelper;
import org.opensearch.cluster.coordination.PublicationTransportHandler;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
Expand All @@ -50,6 +56,11 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.Random;
import java.util.List;
import java.util.Arrays;


import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
Expand Down Expand Up @@ -250,4 +261,86 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
ensureStableCluster(3);
}

/**
* Test Repositories Configured Node Join Commit failures.
*/
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
final String remoteStateRepoName = "remote-state-repo";
final String remoteRoutingTableRepoName = "routing-table-repo";


Settings remotePublicationSettings = buildRemotePublicationNodeAttributes(
remoteStateRepoName,
ReloadableFsRepository.TYPE,
remoteRoutingTableRepoName,
ReloadableFsRepository.TYPE
);
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startDataOnlyNodes(3);

String clusterManagerNode = internalCluster().getClusterManagerName();
List<String> nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames()).filter(node -> !node.equals(clusterManagerNode)).collect(Collectors.toList());

ensureStableCluster(6);

MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);
logger.info("Blocking Cluster Manager Commit Request on all nodes");
nonClusterManagerNodes.forEach(
node -> {
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
clusterManagerTransportService.addOpenSearchFailureException(
targetTransportService,
new FailedToCommitClusterStateException("Blocking Commit"),
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME
);
}
);

logger.info("Starting Node with remote publication settings");
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);

logger.info("Stopping current Cluster Manager");
internalCluster().stopCurrentClusterManagerNode();
ensureStableCluster(6);

Random random = new Random();
String randomNode = nonClusterManagerNodes.get(random.nextInt(nonClusterManagerNodes.size()));

RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode).state().metadata().custom(RepositoriesMetadata.TYPE);

Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}

Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured);

isRemoteStateRepoConfigured = Boolean.FALSE;
isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

if (repositoriesService.isRepositoryPresent(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
}
if (repositoriesService.isRepositoryPresent(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}

Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured);

logger.info("Stopping current Cluster Manager");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
import org.opensearch.repositories.RepositoryException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.*;
import java.util.function.Supplier;

import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
Expand Down Expand Up @@ -183,6 +178,14 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded but
// the commit operation failed, the cluster-state may have the repository metadata which is not applied into the
// repository service. This may lead to assertion failures down the line.
if (!repositoriesService.get().isRepositoryPresent(newRepositoryMetadata.name())) {
logger.warn("remote repository [{}] in cluster-state but repository-service but not present "
+ "in repository-service, skipping checks", newRepositoryMetadata.name());
break;
}
try {
// This will help in handling two scenarios -
// 1. When a fresh cluster is formed and a node tries to join the cluster, the repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ public Repository repository(String repositoryName) {
throw new RepositoryMissingException(repositoryName);
}

public Boolean isRepositoryPresent(final String repositoryName) {
return Objects.nonNull(repositories.get(repositoryName));
}

public List<RepositoryStatsSnapshot> repositoriesStats() {
List<RepositoryStatsSnapshot> activeRepoStats = getRepositoryStatsForActiveRepositories();
return activeRepoStats;
Expand Down Expand Up @@ -904,6 +908,8 @@ public void ensureValidSystemRepositoryUpdate(RepositoryMetadata newRepositoryMe
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings();

assert Objects.nonNull(repository) : String.format("repository [%s] not present in RepositoryService", currentRepositoryMetadata.name());

List<String> restrictedSettings = repository.getRestrictedSystemRepositorySettings()
.stream()
.map(setting -> setting.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,10 +2322,25 @@ public List<String> startNodes(int numOfNodes, Settings settings) {
return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}


/**
* Starts multiple nodes with the given settings and returns their names
*/
public List<String> startNodes(int numOfNodes, Settings settings, Boolean ignoreNodeJoin) {
return startNodes(ignoreNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Settings... extraSettings) {
return startNodes(false, extraSettings);
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Boolean ignoreNodeJoin, Settings... extraSettings) {
final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count());
final int defaultMinClusterManagerNodes;
if (autoManageClusterManagerNodes) {
Expand All @@ -2339,9 +2354,9 @@ public synchronized List<String> startNodes(Settings... extraSettings) {
&& prevClusterManagerCount == 0
&& newClusterManagerCount > 0
&& Arrays.stream(extraSettings)
.allMatch(s -> DiscoveryNode.isClusterManagerNode(s) == false || ZEN2_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(s)))
? RandomNumbers.randomIntBetween(random, 0, newClusterManagerCount - 1)
: -1;
.allMatch(s -> DiscoveryNode.isClusterManagerNode(s) == false || ZEN2_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(s)))
? RandomNumbers.randomIntBetween(random, 0, newClusterManagerCount - 1)
: -1;

final int numOfNodes = extraSettings.length;
final int firstNodeId = nextNodeId.getAndIncrement();
Expand Down Expand Up @@ -2377,7 +2392,7 @@ public synchronized List<String> startNodes(Settings... extraSettings) {
nodes.add(nodeAndClient);
}
startAndPublishNodesAndClients(nodes);
if (autoManageClusterManagerNodes) {
if (autoManageClusterManagerNodes && !ignoreNodeJoin) {
validateClusterFormed();
}
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
Expand Down Expand Up @@ -2422,6 +2437,10 @@ public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build());
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin);
}

public List<String> startSearchOnlyNodes(int numNodes) {
return startSearchOnlyNodes(numNodes, Settings.EMPTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import reactor.util.annotation.NonNull;

import java.io.IOException;
import java.lang.Runtime.Version;
Expand Down Expand Up @@ -2915,6 +2916,43 @@ protected static Settings buildRemoteStoreNodeAttributes(
return settings.build();
}

protected Settings buildRemotePublicationNodeAttributes(
@NonNull String remoteStateRepoName,
@NonNull String remoteStateRepoType,
@NonNull String routingTableRepoName,
@NonNull String routingTableRepoType
) {
String remoteStateRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
remoteStateRepoName
);
String routingTableRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
routingTableRepoName
);
String remoteStateRepositorySettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
remoteStateRepoName
);
String routingTableRepositorySettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);

return Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, remoteStateRepoName)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(remoteStateRepositoryTypeAttributeKey, remoteStateRepoType)
.put(routingTableRepositoryTypeAttributeKey, routingTableRepoType)
.put(remoteStateRepositorySettingsAttributeKeyPrefix+"location", randomRepoPath().toAbsolutePath())
.put(routingTableRepositorySettingsAttributeKeyPrefix+"location", randomRepoPath().toAbsolutePath())
.build();
}

public static String resolvePath(IndexId indexId, String shardId) {
PathType pathType = PathType.fromCode(indexId.getShardPathType());
RemoteStorePathStrategy.SnapshotShardPathInput shardPathInput = new RemoteStorePathStrategy.SnapshotShardPathInput.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -376,6 +377,35 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final
});
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(TransportService transportService, final OpenSearchException exception, final String... blockedActions) {
addOpenSearchFailureException(transportService, exception, new HashSet<>(Arrays.asList(blockedActions)));
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(TransportService transportService, OpenSearchException exception, final Set<String> blockedActions) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addOpenSearchFailureException(transportAddress, exception, blockedActions);
}
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(TransportAddress transportAddress, OpenSearchException exception, final Set<String> blockedActions) {
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
if (blockedActions.contains(action)) {
logger.info("--> preventing {} request", action);
throw exception;
}
connection.sendRequest(requestId, action, request, options);
});
}

/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
Expand Down

0 comments on commit 5836719

Please sign in to comment.