Skip to content

Commit

Permalink
Removed changes which were not necessary
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Oct 3, 2023
1 parent f08531c commit 0b0bd86
Showing 1 changed file with 118 additions and 122 deletions.
240 changes: 118 additions & 122 deletions server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,99 @@
* @opensearch.internal
*/
public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
protected boolean shouldSkipFetchForRecovery(ShardRouting shard) {
if (shard.primary()) {
return true;
}
if (shard.initializing() == false) {
return true;
}
if (shard.relocatingNodeId() != null) {
return true;
}
if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
return true;
}
return false;
}

protected Runnable getShardCancellationAction(
ShardRouting shard,
RoutingAllocation allocation,
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores
) {
Metadata metadata = allocation.metadata();
RoutingNodes routingNodes = allocation.routingNodes();
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());

final StoreFilesMetadata primaryStore = findStore(primaryNode, nodeShardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
return null;
}

MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, nodeShardStores, false);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
if (currentNode.equals(nodeWithHighestMatch) == false
&& matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch)
&& canPerformOperationBasedRecovery(primaryStore, nodeShardStores, currentNode) == false) {
// we found a better match that can perform noop recovery, cancel the existing allocation.
logger.debug(
"cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]",
currentNode,
nodeWithHighestMatch
);
final Set<String> failedNodeIds = shard.unassignedInfo() == null
? Collections.emptySet()
: shard.unassignedInfo().getFailedNodeIds();
UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to ["
+ currentNode
+ "] cancelled, can perform a noop recovery on ["
+ nodeWithHighestMatch
+ "]",
null,
0,
allocation.getCurrentNanoTime(),
System.currentTimeMillis(),
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
failedNodeIds
);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
return () -> routingNodes.failShard(
logger,
shard,
unassignedInfo,
metadata.getIndexSafe(shard.index()),
allocation.changes()
);
}
}
return null;
}

/**
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
* has to copy segment files.
*/
public void processExistingRecoveries(RoutingAllocation allocation) {
Metadata metadata = allocation.metadata();
RoutingNodes routingNodes = allocation.routingNodes();
List<Runnable> shardCancellationActions = new ArrayList<>();
for (RoutingNode routingNode : routingNodes) {
for (ShardRouting shard : routingNode) {
if (shard.primary()) {
continue;
}
if (shard.initializing() == false) {
continue;
}
if (shard.relocatingNodeId() != null) {
continue;
}

// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
if (shouldSkipFetchForRecovery(shard)) {
continue;
}

Expand All @@ -102,63 +172,11 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
logger.trace("{}: fetching new stores for initializing shard", shard);
continue; // still fetching
}
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = adaptToNodeShardStores(shardStores);

ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
NodeShardStores nodeShardStores = adaptToNodeShardStores(shardStores);
final StoreFilesMetadata primaryStore = findStore(primaryNode, nodeShardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
continue;
}

MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, nodeShardStores, false);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
if (currentNode.equals(nodeWithHighestMatch) == false
&& matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch)
&& canPerformOperationBasedRecovery(primaryStore, nodeShardStores, currentNode) == false) {
// we found a better match that can perform noop recovery, cancel the existing allocation.
logger.debug(
"cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]",
currentNode,
nodeWithHighestMatch
);
final Set<String> failedNodeIds = shard.unassignedInfo() == null
? Collections.emptySet()
: shard.unassignedInfo().getFailedNodeIds();
UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to ["
+ currentNode
+ "] cancelled, can perform a noop recovery on ["
+ nodeWithHighestMatch
+ "]",
null,
0,
allocation.getCurrentNanoTime(),
System.currentTimeMillis(),
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
failedNodeIds
);
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
shardCancellationActions.add(
() -> routingNodes.failShard(
logger,
shard,
unassignedInfo,
metadata.getIndexSafe(shard.index()),
allocation.changes()
)
);
}
Runnable cancellationAction = getShardCancellationAction(shard, allocation, nodeShardStores);
if (cancellationAction != null) {
shardCancellationActions.add(cancellationAction);
}
}
}
Expand Down Expand Up @@ -211,14 +229,14 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
}
NodeShardStores nodeShardStores = adaptToNodeShardStores(shardStores);
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = adaptToNodeShardStores(shardStores);
return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger);
}

protected AllocateUnassignedDecision getAllocationDecision(
ShardRouting unassignedShard,
RoutingAllocation allocation,
NodeShardStores nodeShardStores,
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores,
Tuple<Decision, Map<String, NodeAllocationResult>> allocationDecision,
Logger logger
) {
Expand Down Expand Up @@ -348,7 +366,7 @@ protected static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocat
* Takes the store info for nodes that have a shard store and adds them to the node decisions,
* leaving the node explanations untouched for those nodes that do not have any store information.
*/
protected static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores
) {
Expand All @@ -369,36 +387,35 @@ protected static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
protected static StoreFilesMetadata findStore(
private static StoreFilesMetadata findStore(
DiscoveryNode node,
NodeShardStores data
Map<DiscoveryNode, StoreFilesMetadata> data
) {
NodeShardStore nodeFilesStore = data.getNodeShardStores().get(node);
if (nodeFilesStore == null) {
if (!data.containsKey(node)) {
return null;
}
return nodeFilesStore.getStoreFilesMetadata();
return data.get(node);
}

protected MatchingNodes findMatchingNodes(
private MatchingNodes findMatchingNodes(
ShardRouting shard,
RoutingAllocation allocation,
boolean noMatchFailedNodes,
DiscoveryNode primaryNode,
StoreFilesMetadata primaryStore,
NodeShardStores data,
Map<DiscoveryNode, StoreFilesMetadata> data,
boolean explain
) {
Map<DiscoveryNode, MatchingNode> matchingNodes = new HashMap<>();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
for (Map.Entry<DiscoveryNode, NodeShardStore> nodeStoreEntry : data.getNodeShardStores().entrySet()) {
for (Map.Entry<DiscoveryNode, StoreFilesMetadata> nodeStoreEntry : data.entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
if (noMatchFailedNodes
&& shard.unassignedInfo() != null
&& shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().getStoreFilesMetadata();
StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue();
// we don't have any files at all, it is an empty index
if (storeFilesMetadata.isEmpty()) {
continue;
Expand Down Expand Up @@ -453,20 +470,20 @@ protected MatchingNodes findMatchingNodes(
return new MatchingNodes(matchingNodes, nodeDecisions);
}

private NodeShardStores adaptToNodeShardStores(AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data) {
NodeShardStores nodeShardStores = new NodeShardStores();
nodeShardStores.getNodeShardStores().putAll(
data.getData().entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> new NodeShardStore(entry.getValue().storeFilesMetadata())
private Map<DiscoveryNode, StoreFilesMetadata> adaptToNodeShardStores(AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data) {
assert data.hasData();
return new HashMap<>(
data.getData().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().storeFilesMetadata()
)
)
)
);
return nodeShardStores;
}

protected static long computeMatchingBytes(
private static long computeMatchingBytes(
StoreFilesMetadata primaryStore,
StoreFilesMetadata storeFilesMetadata
) {
Expand All @@ -480,15 +497,15 @@ protected static long computeMatchingBytes(
return sizeMatched;
}

protected static boolean hasMatchingSyncId(
private static boolean hasMatchingSyncId(
StoreFilesMetadata primaryStore,
StoreFilesMetadata replicaStore
) {
String primarySyncId = primaryStore.syncId();
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
}

protected static MatchingNode computeMatchingNode(
private static MatchingNode computeMatchingNode(
DiscoveryNode primaryNode,
StoreFilesMetadata primaryStore,
DiscoveryNode replicaNode,
Expand All @@ -502,16 +519,16 @@ protected static MatchingNode computeMatchingNode(
return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery);
}

protected static boolean canPerformOperationBasedRecovery(
private static boolean canPerformOperationBasedRecovery(
StoreFilesMetadata primaryStore,
NodeShardStores shardStores,
Map<DiscoveryNode, StoreFilesMetadata> shardStores,
DiscoveryNode targetNode
) {
final NodeShardStore targetNodeStore = shardStores.getNodeShardStores().get(targetNode);
if (targetNodeStore == null || targetNodeStore.getStoreFilesMetadata().isEmpty()) {
final StoreFilesMetadata targetNodeStore = shardStores.get(targetNode);
if (targetNodeStore == null || targetNodeStore.isEmpty()) {
return false;
}
if (hasMatchingSyncId(primaryStore, targetNodeStore.getStoreFilesMetadata())) {
if (hasMatchingSyncId(primaryStore, targetNodeStore)) {
return true;
}
return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0;
Expand All @@ -524,6 +541,9 @@ protected static boolean canPerformOperationBasedRecovery(
*/
protected abstract boolean hasInitiatedFetching(ShardRouting shard);

/**
* A class to enacapsulate the details regarding the a MatchNode for shard assignment
*/
protected static class MatchingNode {
static final Comparator<MatchingNode> COMPARATOR = Comparator.<MatchingNode, Boolean>comparing(m -> m.isNoopRecovery)
.thenComparing(m -> m.retainingSeqNo)
Expand Down Expand Up @@ -586,28 +606,4 @@ public Map<String, NodeAllocationResult> getNodeDecisions() {
return this.nodeDecisions;
}
}

protected static class NodeShardStore {
private final StoreFilesMetadata storeFilesMetadata;

protected NodeShardStore(StoreFilesMetadata storeFilesMetadata) {
this.storeFilesMetadata = storeFilesMetadata;
}

protected StoreFilesMetadata getStoreFilesMetadata() {
return storeFilesMetadata;
}
}

protected static class NodeShardStores {
private final Map<DiscoveryNode, NodeShardStore> nodeShardStores;

protected NodeShardStores() {
this.nodeShardStores = new HashMap<>();
}

protected Map<DiscoveryNode, NodeShardStore> getNodeShardStores() {
return nodeShardStores;
}
}
}

0 comments on commit 0b0bd86

Please sign in to comment.