Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Dec 6, 2024
1 parent ed9f5e7 commit d009e53
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
Expand All @@ -57,6 +59,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -261,7 +264,9 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
}

/**
* Test Repositories Configured Node Join Commit failures.
* Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a
* repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes
* doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish.
*/
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
final String remoteStateRepoName = "remote-state-repo";
Expand All @@ -288,20 +293,32 @@ public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitF
clusterManagerNode
);
logger.info("Blocking Cluster Manager Commit Request on all nodes");
// This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the
// nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins
nonClusterManagerNodes.forEach(node -> {
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
clusterManagerTransportService.addOpenSearchFailureException(
targetTransportService,
new FailedToCommitClusterStateException("Blocking Commit"),
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME
);
clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> {
if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) {
logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME);
throw new FailedToCommitClusterStateException("Blocking Commit");
}
connection.sendRequest(requestId, action, request, options);
});
});

logger.info("Starting Node with remote publication settings");
// Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create
// a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data.
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);

logger.info("Stopping current Cluster Manager");
// We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes
// we had the new cluster-state published but not commited.
internalCluster().stopCurrentClusterManagerNode();

// We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The
// repositories validations are skipped because even though the cluster-state is updated in the persisted registry,
// the repository service will not be updated as the commit attempt failed.
ensureStableCluster(6);

String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));
Expand Down Expand Up @@ -330,11 +347,22 @@ public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitF

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

if (repositoriesService.isRepositoryPresent(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
try {
Repository remoteStateRepo = repositoriesService.repository(remoteStateRepoName);
if (Objects.nonNull(remoteStateRepo)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
}
} catch (RepositoryMissingException e) {
isRemoteStateRepoConfigured = Boolean.FALSE;
}
if (repositoriesService.isRepositoryPresent(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;

try {
Repository routingTableRepo = repositoriesService.repository(remoteRoutingTableRepoName);
if (Objects.nonNull(routingTableRepo)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
} catch (RepositoryMissingException e) {
isRemoteRoutingTableRepoConfigured = Boolean.FALSE;
}

Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -183,13 +184,17 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded but
// the commit operation failed, the cluster-state may have the repository metadata which is not applied into the
// repository service. This may lead to assertion failures down the line.
if (!repositoriesService.get().isRepositoryPresent(newRepositoryMetadata.name())) {
try {
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded
// but
// the commit operation failed, the cluster-state may have the repository metadata which is not applied into the
// repository service. This may lead to assertion failures down the line.
String repositoryName = newRepositoryMetadata.name();
repositoriesService.get().repository(repositoryName);
} catch (RepositoryMissingException e) {
logger.warn(
"remote repository [{}] in cluster-state but repository-service but not present "
+ "in repository-service, skipping checks",
"Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present "
+ "in the repository service.",
newRepositoryMetadata.name()
);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,6 @@ public Repository repository(String repositoryName) {
throw new RepositoryMissingException(repositoryName);
}

public Boolean isRepositoryPresent(final String repositoryName) {
return Objects.nonNull(repositories.get(repositoryName));
}

public List<RepositoryStatsSnapshot> repositoriesStats() {
List<RepositoryStatsSnapshot> activeRepoStats = getRepositoryStatsForActiveRepositories();
return activeRepoStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -1378,6 +1379,72 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception {
Map<String, String> newNodeAttributes = new HashMap<>();
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));

final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
RepositoriesService repositoriesService = mock(RepositoriesService.class);
when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO);
final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO);
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>() {
{
add(clusterStateRepo);
add(routingTableRepo);
}
};

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
.build();

final DiscoveryNode joiningNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);

}

private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -377,47 +376,6 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final
});
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(
TransportService transportService,
final OpenSearchException exception,
final String... blockedActions
) {
addOpenSearchFailureException(transportService, exception, new HashSet<>(Arrays.asList(blockedActions)));
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(
TransportService transportService,
OpenSearchException exception,
final Set<String> blockedActions
) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addOpenSearchFailureException(transportAddress, exception, blockedActions);
}
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(
TransportAddress transportAddress,
OpenSearchException exception,
final Set<String> blockedActions
) {
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
if (blockedActions.contains(action)) {
logger.info("--> preventing {} request", action);
throw exception;
}
connection.sendRequest(requestId, action, request, options);
});
}

/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
Expand Down

0 comments on commit d009e53

Please sign in to comment.