From 0b0bd8676e27caa54fee837b7d20efec94b7e0ba Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 4 Oct 2023 04:56:08 +0530 Subject: [PATCH] Removed changes which were not necessary Signed-off-by: Shivansh Arora --- .../gateway/ReplicaShardAllocator.java | 240 +++++++++--------- 1 file changed, 118 insertions(+), 122 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index 5c42d7c94e4a2..43e80dd796c76 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -71,29 +71,99 @@ * @opensearch.internal */ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator { + protected boolean shouldSkipFetchForRecovery(ShardRouting shard) { + if (shard.primary()) { + return true; + } + if (shard.initializing() == false) { + return true; + } + if (shard.relocatingNodeId() != null) { + return true; + } + if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) { + // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... + return true; + } + return false; + } + + protected Runnable getShardCancellationAction( + ShardRouting shard, + RoutingAllocation allocation, + Map nodeShardStores + ) { + Metadata metadata = allocation.metadata(); + RoutingNodes routingNodes = allocation.routingNodes(); + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); + assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; + assert primaryShard.currentNodeId() != null; + final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); + + final StoreFilesMetadata primaryStore = findStore(primaryNode, nodeShardStores); + if (primaryStore == null) { + // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) + // just let the recovery find it out, no need to do anything about it for the initializing shard + logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); + return null; + } + + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, nodeShardStores, false); + if (matchingNodes.getNodeWithHighestMatch() != null) { + DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); + DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); + // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider + if (currentNode.equals(nodeWithHighestMatch) == false + && matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch) + && canPerformOperationBasedRecovery(primaryStore, nodeShardStores, currentNode) == false) { + // we found a better match that can perform noop recovery, cancel the existing allocation. + logger.debug( + "cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", + currentNode, + nodeWithHighestMatch + ); + final Set failedNodeIds = shard.unassignedInfo() == null + ? Collections.emptySet() + : shard.unassignedInfo().getFailedNodeIds(); + UnassignedInfo unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.REALLOCATED_REPLICA, + "existing allocation of replica to [" + + currentNode + + "] cancelled, can perform a noop recovery on [" + + nodeWithHighestMatch + + "]", + null, + 0, + allocation.getCurrentNanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + failedNodeIds + ); + // don't cancel shard in the loop as it will cause a ConcurrentModificationException + return () -> routingNodes.failShard( + logger, + shard, + unassignedInfo, + metadata.getIndexSafe(shard.index()), + allocation.changes() + ); + } + } + return null; + } + /** * Process existing recoveries of replicas and see if we need to cancel them if we find a better * match. Today, a better match is one that can perform a no-op recovery while the previous recovery * has to copy segment files. */ public void processExistingRecoveries(RoutingAllocation allocation) { - Metadata metadata = allocation.metadata(); RoutingNodes routingNodes = allocation.routingNodes(); List shardCancellationActions = new ArrayList<>(); for (RoutingNode routingNode : routingNodes) { for (ShardRouting shard : routingNode) { - if (shard.primary()) { - continue; - } - if (shard.initializing() == false) { - continue; - } - if (shard.relocatingNodeId() != null) { - continue; - } - - // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... - if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) { + if (shouldSkipFetchForRecovery(shard)) { continue; } @@ -102,63 +172,11 @@ public void processExistingRecoveries(RoutingAllocation allocation) { logger.trace("{}: fetching new stores for initializing shard", shard); continue; // still fetching } + Map nodeShardStores = adaptToNodeShardStores(shardStores); - ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); - assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; - assert primaryShard.currentNodeId() != null; - final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - NodeShardStores nodeShardStores = adaptToNodeShardStores(shardStores); - final StoreFilesMetadata primaryStore = findStore(primaryNode, nodeShardStores); - if (primaryStore == null) { - // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) - // just let the recovery find it out, no need to do anything about it for the initializing shard - logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); - continue; - } - - MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, nodeShardStores, false); - if (matchingNodes.getNodeWithHighestMatch() != null) { - DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); - DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); - // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider - if (currentNode.equals(nodeWithHighestMatch) == false - && matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch) - && canPerformOperationBasedRecovery(primaryStore, nodeShardStores, currentNode) == false) { - // we found a better match that can perform noop recovery, cancel the existing allocation. - logger.debug( - "cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", - currentNode, - nodeWithHighestMatch - ); - final Set failedNodeIds = shard.unassignedInfo() == null - ? Collections.emptySet() - : shard.unassignedInfo().getFailedNodeIds(); - UnassignedInfo unassignedInfo = new UnassignedInfo( - UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" - + currentNode - + "] cancelled, can perform a noop recovery on [" - + nodeWithHighestMatch - + "]", - null, - 0, - allocation.getCurrentNanoTime(), - System.currentTimeMillis(), - false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, - failedNodeIds - ); - // don't cancel shard in the loop as it will cause a ConcurrentModificationException - shardCancellationActions.add( - () -> routingNodes.failShard( - logger, - shard, - unassignedInfo, - metadata.getIndexSafe(shard.index()), - allocation.changes() - ) - ); - } + Runnable cancellationAction = getShardCancellationAction(shard, allocation, nodeShardStores); + if (cancellationAction != null) { + shardCancellationActions.add(cancellationAction); } } } @@ -211,14 +229,14 @@ public AllocateUnassignedDecision makeAllocationDecision( } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } - NodeShardStores nodeShardStores = adaptToNodeShardStores(shardStores); + Map nodeShardStores = adaptToNodeShardStores(shardStores); return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger); } protected AllocateUnassignedDecision getAllocationDecision( ShardRouting unassignedShard, RoutingAllocation allocation, - NodeShardStores nodeShardStores, + Map nodeShardStores, Tuple> allocationDecision, Logger logger ) { @@ -348,7 +366,7 @@ protected static Tuple> canBeAllocat * Takes the store info for nodes that have a shard store and adds them to the node decisions, * leaving the node explanations untouched for those nodes that do not have any store information. */ - protected static List augmentExplanationsWithStoreInfo( + private static List augmentExplanationsWithStoreInfo( Map nodeDecisions, Map withShardStores ) { @@ -369,36 +387,35 @@ protected static List augmentExplanationsWithStoreInfo( /** * Finds the store for the assigned shard in the fetched data, returns null if none is found. */ - protected static StoreFilesMetadata findStore( + private static StoreFilesMetadata findStore( DiscoveryNode node, - NodeShardStores data + Map data ) { - NodeShardStore nodeFilesStore = data.getNodeShardStores().get(node); - if (nodeFilesStore == null) { + if (!data.containsKey(node)) { return null; } - return nodeFilesStore.getStoreFilesMetadata(); + return data.get(node); } - protected MatchingNodes findMatchingNodes( + private MatchingNodes findMatchingNodes( ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNodes, DiscoveryNode primaryNode, StoreFilesMetadata primaryStore, - NodeShardStores data, + Map data, boolean explain ) { Map matchingNodes = new HashMap<>(); Map nodeDecisions = explain ? new HashMap<>() : null; - for (Map.Entry nodeStoreEntry : data.getNodeShardStores().entrySet()) { + for (Map.Entry nodeStoreEntry : data.entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); if (noMatchFailedNodes && shard.unassignedInfo() != null && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { continue; } - StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().getStoreFilesMetadata(); + StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue(); // we don't have any files at all, it is an empty index if (storeFilesMetadata.isEmpty()) { continue; @@ -453,20 +470,20 @@ protected MatchingNodes findMatchingNodes( return new MatchingNodes(matchingNodes, nodeDecisions); } - private NodeShardStores adaptToNodeShardStores(AsyncShardFetch.FetchResult data) { - NodeShardStores nodeShardStores = new NodeShardStores(); - nodeShardStores.getNodeShardStores().putAll( - data.getData().entrySet().stream().collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> new NodeShardStore(entry.getValue().storeFilesMetadata()) + private Map adaptToNodeShardStores(AsyncShardFetch.FetchResult data) { + assert data.hasData(); + return new HashMap<>( + data.getData().entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().storeFilesMetadata() + ) ) - ) ); - return nodeShardStores; } - protected static long computeMatchingBytes( + private static long computeMatchingBytes( StoreFilesMetadata primaryStore, StoreFilesMetadata storeFilesMetadata ) { @@ -480,7 +497,7 @@ protected static long computeMatchingBytes( return sizeMatched; } - protected static boolean hasMatchingSyncId( + private static boolean hasMatchingSyncId( StoreFilesMetadata primaryStore, StoreFilesMetadata replicaStore ) { @@ -488,7 +505,7 @@ protected static boolean hasMatchingSyncId( return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); } - protected static MatchingNode computeMatchingNode( + private static MatchingNode computeMatchingNode( DiscoveryNode primaryNode, StoreFilesMetadata primaryStore, DiscoveryNode replicaNode, @@ -502,16 +519,16 @@ protected static MatchingNode computeMatchingNode( return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery); } - protected static boolean canPerformOperationBasedRecovery( + private static boolean canPerformOperationBasedRecovery( StoreFilesMetadata primaryStore, - NodeShardStores shardStores, + Map shardStores, DiscoveryNode targetNode ) { - final NodeShardStore targetNodeStore = shardStores.getNodeShardStores().get(targetNode); - if (targetNodeStore == null || targetNodeStore.getStoreFilesMetadata().isEmpty()) { + final StoreFilesMetadata targetNodeStore = shardStores.get(targetNode); + if (targetNodeStore == null || targetNodeStore.isEmpty()) { return false; } - if (hasMatchingSyncId(primaryStore, targetNodeStore.getStoreFilesMetadata())) { + if (hasMatchingSyncId(primaryStore, targetNodeStore)) { return true; } return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0; @@ -524,6 +541,9 @@ protected static boolean canPerformOperationBasedRecovery( */ protected abstract boolean hasInitiatedFetching(ShardRouting shard); + /** + * A class to enacapsulate the details regarding the a MatchNode for shard assignment + */ protected static class MatchingNode { static final Comparator COMPARATOR = Comparator.comparing(m -> m.isNoopRecovery) .thenComparing(m -> m.retainingSeqNo) @@ -586,28 +606,4 @@ public Map getNodeDecisions() { return this.nodeDecisions; } } - - protected static class NodeShardStore { - private final StoreFilesMetadata storeFilesMetadata; - - protected NodeShardStore(StoreFilesMetadata storeFilesMetadata) { - this.storeFilesMetadata = storeFilesMetadata; - } - - protected StoreFilesMetadata getStoreFilesMetadata() { - return storeFilesMetadata; - } - } - - protected static class NodeShardStores { - private final Map nodeShardStores; - - protected NodeShardStores() { - this.nodeShardStores = new HashMap<>(); - } - - protected Map getNodeShardStores() { - return nodeShardStores; - } - } }