diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 36c81100793cf..0dfd8d5718e54 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -8,38 +8,23 @@ package org.opensearch.gateway; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.RecoverySource; -import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.NodeAllocationResult; -import org.opensearch.cluster.routing.allocation.NodeAllocationResult.ShardStoreInfo; import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.cluster.routing.allocation.decider.Decision.Type; -import org.opensearch.env.ShardLockObtainFailedException; -import org.opensearch.gateway.AsyncBatchShardFetch.FetchResult; +import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is @@ -59,49 +44,48 @@ * * @opensearch.internal */ -public abstract class PrimaryShardBatchAllocator extends BaseGatewayShardAllocator { - /** - * Is the allocator responsible for allocating the given {@link ShardRouting}? - */ - private static boolean isResponsibleFor(final ShardRouting shard) { - return shard.primary() // must be primary - && shard.unassigned() // must be unassigned - // only handle either an existing store or a snapshot recovery - && (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE - || shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT); - } +public abstract class PrimaryShardBatchAllocator extends PrimaryShardAllocator { - abstract protected FetchResult fetchData(Set shardsEligibleForFetch, - Set inEligibleShards, - RoutingAllocation allocation); + abstract protected FetchResult fetchData( + Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation + ); + + protected FetchResult fetchData( + ShardRouting shard, + RoutingAllocation allocation + ) { + return null; + } @Override - public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, - RoutingAllocation allocation, - Logger logger) { + public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) { - return makeAllocationDecision(new HashSet<>(Collections.singletonList(unassignedShard)), - allocation, logger).get(unassignedShard); + return makeAllocationDecision(new HashSet<>(Collections.singletonList(unassignedShard)), allocation, logger).get(unassignedShard); } /** - * Build allocation decisions for all the shards present in the batch identified by batchId. - * @param shards set of shards given for allocation + * Build allocation decisions for all the shards given to this allocator.. + * + * @param shards set of shards given for allocation * @param allocation current allocation of all the shards - * @param logger logger used for logging + * @param logger logger used for logging * @return shard to allocation decision map */ @Override - public HashMap makeAllocationDecision(Set shards, - RoutingAllocation allocation, - Logger logger) { + public HashMap makeAllocationDecision( + Set shards, + RoutingAllocation allocation, + Logger logger + ) { HashMap shardAllocationDecisions = new HashMap<>(); final boolean explain = allocation.debugDecision(); Set shardsEligibleForFetch = new HashSet<>(); Set shardsNotEligibleForFetch = new HashSet<>(); // identify ineligible shards for (ShardRouting shard : shards) { - AllocateUnassignedDecision decision = skipSnapshotRestore(shard, allocation); + AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation); if (decision != null) { shardsNotEligibleForFetch.add(shard); shardAllocationDecisions.put(shard, decision); @@ -109,12 +93,16 @@ public HashMap makeAllocationDecision( shardsEligibleForFetch.add(shard); } } + // Do not call fetchData if there are no eligible shards + if (shardsEligibleForFetch.size() == 0) { + return shardAllocationDecisions; + } // only fetch data for eligible shards - final FetchResult shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation); - // Note : shardsState contain the Data, there key is DiscoveryNode but value is Map so to get one shard level data (from all the nodes), we'll traverse the map - // and construct the nodeShardState along the way before making any allocation decision. As metadata for a - // particular shard is needed from all the discovery nodes. + final FetchResult shardsState = fetchData( + shardsEligibleForFetch, + shardsNotEligibleForFetch, + allocation + ); // process the received data for (ShardRouting unassignedShard : shardsEligibleForFetch) { @@ -125,438 +113,52 @@ public HashMap makeAllocationDecision( if (explain) { nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); } - shardAllocationDecisions.put(unassignedShard, - AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, - nodeDecisions)); - } else { - Map nodeResponses = shardsState.getData(); - Map shardData = new HashMap<>(); - // build data for a shard from all the nodes - for (Map.Entry nodeEntry : nodeResponses.entrySet()) { - shardData.put(nodeEntry.getKey(), - nodeEntry.getValue().getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId())); - } - // get allocation decision for this shard - shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, - shardData, logger)); - } - } - return shardAllocationDecisions; - } - - /** - * Below code is very similar to {@link org.opensearch.gateway.PrimaryShardAllocator} class makeAllocationDecision, - * only difference is that NodeGatewayStartedShards object doesn't have the DiscoveryNode object as - * BaseNodeResponse. So, DiscoveryNode reference is passed in Map so - * corresponding DiscoveryNode object can be used for rest of the implementation. Also, DiscoveryNode object - * reference is added in DecidedNode class to achieve same use case of accessing corresponding DiscoveryNode object. - * @param unassignedShard unassigned shard routing - * @param allocation routing allocation object - * @param shardState shard metadata fetched from all data nodes - * @param logger logger - * @return allocation decision taken for this shard - */ - private AllocateUnassignedDecision getAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, - Map shardState, - Logger logger) { - final boolean explain = allocation.debugDecision(); - // don't create a new IndexSetting object for every shard as this could cause a lot of garbage - // on cluster restart if we allocate a boat load of shards - final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); - final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id()); - final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT; - - assert inSyncAllocationIds.isEmpty() == false; - // use in-sync allocation ids to select nodes - final PrimaryShardBatchAllocator.NodeShardsResult nodeShardsResult = buildNodeShardsResult( - unassignedShard, - snapshotRestore, - allocation.getIgnoreNodes(unassignedShard.shardId()), - inSyncAllocationIds, - shardState, - logger - ); - final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; - logger.debug( - "[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", - unassignedShard.index(), - unassignedShard.id(), - nodeShardsResult.orderedAllocationCandidates.size(), - unassignedShard, - inSyncAllocationIds - ); - - if (enoughAllocationsFound == false) { - if (snapshotRestore) { - // let BalancedShardsAllocator take care of allocating this shard - logger.debug( - "[{}][{}]: missing local data, will restore from [{}]", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard.recoverySource() - ); - return AllocateUnassignedDecision.NOT_TAKEN; - } else { - // We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary. - // We could just be waiting for the node that holds the primary to start back up, in which case the allocation for - // this shard will be picked up when the node joins and we do another allocation reroute - logger.debug( - "[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", - unassignedShard.index(), - unassignedShard.id(), - nodeShardsResult.allocationsFound - ); - return AllocateUnassignedDecision.no( - AllocationStatus.NO_VALID_SHARD_COPY, - explain ? buildNodeDecisions(null, shardState, inSyncAllocationIds) : null - ); - } - } - - NodesToAllocate nodesToAllocate = buildNodesToAllocate( - allocation, - nodeShardsResult.orderedAllocationCandidates, - unassignedShard, - false - ); - DiscoveryNode node = null; - String allocationId = null; - boolean throttled = false; - if (nodesToAllocate.yesNodeShards.isEmpty() == false) { - DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - logger.debug( - "[{}][{}]: allocating [{}] to [{}] on primary allocation", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - decidedNode.getNode() - ); - node = decidedNode.getNode(); - allocationId = decidedNode.nodeShardState.allocationId(); - } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) { - // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard - // can be force-allocated to one of the nodes. - nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true); - if (nodesToAllocate.yesNodeShards.isEmpty() == false) { - final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState; - logger.debug( - "[{}][{}]: allocating [{}] to [{}] on forced primary allocation", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - decidedNode.getNode() - ); - node = decidedNode.getNode(); - allocationId = nodeShardState.allocationId(); - } else if (nodesToAllocate.throttleNodeShards.isEmpty() == false) { - logger.debug( - "[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation", - unassignedShard.index(), - unassignedShard.id(), + shardAllocationDecisions.put( unassignedShard, - nodesToAllocate.throttleNodeShards + AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions) ); - throttled = true; } else { - logger.debug( - "[{}][{}]: forced primary allocation denied [{}]", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard - ); - } - } else { - // we are throttling this, since we are allowed to allocate to this node but there are enough allocations - // taking place on the node currently, ignore it for now - logger.debug( - "[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - nodesToAllocate.throttleNodeShards - ); - throttled = true; - } - - List nodeResults = null; - if (explain) { - nodeResults = buildNodeDecisions(nodesToAllocate, shardState, inSyncAllocationIds); - } - if (allocation.hasPendingAsyncFetch()) { - return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeResults); - } else if (node != null) { - return AllocateUnassignedDecision.yes(node, allocationId, nodeResults, false); - } else if (throttled) { - return AllocateUnassignedDecision.throttle(nodeResults); - } else { - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, nodeResults, true); - } - } - - /** - * Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is - * not responsible for this particular shard. - * @param unassignedShard unassigned shard routing - * @param allocation routing allocation object - * @return allocation decision taken for this shard - */ - private AllocateUnassignedDecision skipSnapshotRestore(ShardRouting unassignedShard, RoutingAllocation allocation) { - if (isResponsibleFor(unassignedShard) == false) { - // this allocator is not responsible for allocating this shard - return AllocateUnassignedDecision.NOT_TAKEN; - } - final boolean explain = allocation.debugDecision(); - if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT - && allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) { - List nodeDecisions = null; - if (explain) { - nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); - } - return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); - } - return null; - } - /** - * Builds a map of nodes to the corresponding allocation decisions for those nodes. - */ - private static List buildNodeDecisions( - NodesToAllocate nodesToAllocate, - Map fetchedShardData, - Set inSyncAllocationIds - ) { - List nodeResults = new ArrayList<>(); - Map ineligibleShards; - if (nodesToAllocate != null) { - final Set discoNodes = new HashSet<>(); - nodeResults.addAll( - Stream.of(nodesToAllocate.yesNodeShards, nodesToAllocate.throttleNodeShards, nodesToAllocate.noNodeShards) - .flatMap(Collection::stream) - .map(dnode -> { - discoNodes.add(dnode.getNode()); - return new NodeAllocationResult( - dnode.getNode(), - shardStoreInfo(dnode.nodeShardState, inSyncAllocationIds), - dnode.decision - ); - }) - .collect(Collectors.toList()) - ); - - ineligibleShards = fetchedShardData.entrySet() - .stream() - .filter(shardData -> discoNodes.contains(shardData.getKey()) == false) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } else { - // there were no shard copies that were eligible for being assigned the allocation, - // so all fetched shard data are ineligible shards - ineligibleShards = fetchedShardData; - } - - nodeResults.addAll( - ineligibleShards.entrySet().stream() - .map(shardData -> new NodeAllocationResult(shardData.getKey(), shardStoreInfo(shardData.getValue(), - inSyncAllocationIds), null)) - .collect(Collectors.toList()) - ); - - return nodeResults; - } - - private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { - final Exception storeErr = nodeShardState.storeException(); - final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); - return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); - } - - private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( - (NodeGatewayStartedShards state) -> state.storeException() == null - ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::primary - ).reversed(); - - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::replicationCheckpoint, - Comparator.nullsLast(Comparator.naturalOrder()) - ); - - /** - * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching - * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but - * entries with matching allocation id are always at the front of the list. - */ - protected static NodeShardsResult buildNodeShardsResult( - ShardRouting shard, - boolean matchAnyShard, - Set ignoreNodes, - Set inSyncAllocationIds, - Map shardState, - Logger logger - ) { - /** - * Orders the active shards copies based on below comparators - * 1. No store exception i.e. shard copy is readable - * 2. Prefer previous primary shard - * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. - */ - final Comparator comparator; // allocation preference - if (matchAnyShard) { - // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) - ).reversed(); - comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) - .thenComparing(PRIMARY_FIRST_COMPARATOR) - .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); - } else { - comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR) - .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); - } - // TreeMap will sort the entries based on key, comparator is assigned above - TreeMap shardStatesToNode = new TreeMap<>(comparator); - int numberOfAllocationsFound = 0; - for (Map.Entry nodeShardStateEntry : shardState.entrySet()) { - DiscoveryNode node = nodeShardStateEntry.getKey(); - NodeGatewayStartedShards nodeShardState = nodeShardStateEntry.getValue(); - String allocationId = nodeShardState.allocationId(); - - if (ignoreNodes.contains(node.getId())) { - continue; - } - - if (nodeShardState.storeException() == null) { - if (allocationId == null) { - logger.trace("[{}] on node [{}] has no shard state information", shard, node); - } else { - logger.trace("[{}] on node [{}] has allocation id [{}]", shard, node, allocationId); - } - } else { - final String finalAllocationId = allocationId; - if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { - logger.trace( - () -> new ParameterizedMessage( - "[{}] on node [{}] has allocation id [{}] but the store can not be " - + "opened as it's locked, treating as valid shard", - shard, - node, - finalAllocationId - ), - nodeShardState.storeException() - ); - } else { - logger.trace( - () -> new ParameterizedMessage( - "[{}] on node [{}] has allocation id [{}] but the store can not be " + "opened, treating as no allocation id", - shard, - node, - finalAllocationId - ), - nodeShardState.storeException() - ); - allocationId = null; - } - } - - if (allocationId != null) { - assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException - : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " - + "store throwing " - + nodeShardState.storeException(); - numberOfAllocationsFound++; - if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { - shardStatesToNode.put(nodeShardState, node); - } + NodeShardStates nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState); + // get allocation decision for this shard + shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger)); } } - - if (logger.isTraceEnabled()) { - logger.trace( - "{} candidates for allocation: {}", - shard, - shardState.keySet().stream().map(DiscoveryNode::getName).collect(Collectors.joining(", ")) - ); - } - return new NodeShardsResult(shardStatesToNode, numberOfAllocationsFound); + return shardAllocationDecisions; } /** - * Split the list of node shard states into groups yes/no/throttle based on allocation deciders + * shardsState contain the Data, there key is DiscoveryNode but value is Map so to get one shard level data (from all the nodes), we'll traverse the map + * and construct the nodeShardState along the way before making any allocation decision. As metadata for a + * particular shard is needed from all the discovery nodes. + * + * @param unassignedShard unassigned shard + * @param shardsState fetch data result for the whole batch + * @return shard state returned from each node */ - private static NodesToAllocate buildNodesToAllocate( - RoutingAllocation allocation, - TreeMap shardStateToNode, - ShardRouting shardRouting, - boolean forceAllocate + private static NodeShardStates adaptToNodeShardStates( + ShardRouting unassignedShard, + FetchResult shardsState ) { - List yesNodeShards = new ArrayList<>(); - List throttledNodeShards = new ArrayList<>(); - List noNodeShards = new ArrayList<>(); - for (Map.Entry nodeShardState : shardStateToNode.entrySet()) { - RoutingNode node = allocation.routingNodes().node(nodeShardState.getValue().getId()); - if (node == null) { - continue; - } - - Decision decision = forceAllocate - ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) - : allocation.deciders().canAllocate(shardRouting, node, allocation); - DecidedNode decidedNode = new DecidedNode(nodeShardState.getKey(), decision, nodeShardState.getValue()); - if (decision.type() == Type.THROTTLE) { - throttledNodeShards.add(decidedNode); - } else if (decision.type() == Type.NO) { - noNodeShards.add(decidedNode); - } else { - yesNodeShards.add(decidedNode); - } - } - return new NodesToAllocate( - Collections.unmodifiableList(yesNodeShards), - Collections.unmodifiableList(throttledNodeShards), - Collections.unmodifiableList(noNodeShards) - ); - } - - private static class NodeShardsResult { - final TreeMap orderedAllocationCandidates; - final int allocationsFound; - - NodeShardsResult(TreeMap orderedAllocationCandidates, int allocationsFound) { - this.orderedAllocationCandidates = orderedAllocationCandidates; - this.allocationsFound = allocationsFound; - } - } - - static class NodesToAllocate { - final List yesNodeShards; - final List throttleNodeShards; - final List noNodeShards; - - NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { - this.yesNodeShards = yesNodeShards; - this.throttleNodeShards = throttleNodeShards; - this.noNodeShards = noNodeShards; - } - } - - /** - * This class encapsulates the shard state retrieved from a node and the decision that was made - * by the allocator for allocating to the node that holds the shard copy. - */ - private static class DecidedNode { - final NodeGatewayStartedShards nodeShardState; - final Decision decision; - final DiscoveryNode node; - - private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision, DiscoveryNode node) { - this.nodeShardState = nodeShardState; - this.decision = decision; - this.node = node; - } - - public DiscoveryNode getNode() { - return node; - } + NodeShardStates nodeShardStates = new NodeShardStates(); + Map nodeResponses = shardsState.getData(); + + // build data for a shard from all the nodes + nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> { + NodeGatewayStartedShards shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch() + .get(unassignedShard.shardId()); + nodeShardStates.getNodeShardStates() + .add( + new NodeShardState( + node, + shardData.allocationId(), + shardData.primary(), + shardData.replicationCheckpoint(), + shardData.storeException() + ) + ); + }); + return nodeShardStates; } } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java new file mode 100644 index 0000000000000..b8703192f09b3 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -0,0 +1,339 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.gateway; + +import org.apache.lucene.codecs.Codec; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.AllocationDecision; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.common.Nullable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.set.Sets; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.Environment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.test.IndexSettingsModule; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED; + +public class PrimaryShardBatchAllocatorTests extends OpenSearchAllocationTestCase { + + private final ShardId shardId = new ShardId("test", "_na_", 0); + private static Set shardsInBatch; + private final DiscoveryNode node1 = newNode("node1"); + private final DiscoveryNode node2 = newNode("node2"); + private final DiscoveryNode node3 = newNode("node3"); + private TestBatchAllocator batchAllocator; + + public static void setUpShards(int numberOfShards) { + shardsInBatch = new HashSet<>(); + for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { + ShardId shardId = new ShardId("test", "_na_", shardNumber); + shardsInBatch.add(shardId); + } + } + + @Before + public void buildTestAllocator() { + this.batchAllocator = new TestBatchAllocator(); + } + + private void allocateAllUnassigned(final RoutingAllocation allocation) { + final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + batchAllocator.allocateUnassigned(iterator.next(), allocation, iterator); + } + } + + private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { + final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + Set shardsToBatch = new HashSet<>(); + while (iterator.hasNext()) { + shardsToBatch.add(iterator.next()); + } + batchAllocator.allocateUnassignedBatch(shardsToBatch, allocation); + } + + public void testMakeAllocationDecisionDataFetching() { + final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); + + Set shards = new HashSet<>(); + allocateAllUnassignedBatch(allocation); + ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); + shards.add(shard); + HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); + // verify we get decisions for all the shards + assertEquals(shards.size(), allDecisions.size()); + assertEquals(shards, allDecisions.keySet()); + assertEquals(AllocationDecision.AWAITING_INFO, allDecisions.get(shard).getAllocationDecision()); + } + + public void testMakeAllocationDecisionForReplicaShard() { + final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); + + List replicaShards = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).replicaShards(); + Set shards = new HashSet<>(replicaShards); + HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); + // verify we get decisions for all the shards + assertEquals(shards.size(), allDecisions.size()); + assertEquals(shards, allDecisions.keySet()); + assertEquals(false, allDecisions.get(replicaShards.get(0)).isDecisionTaken()); + } + + public void testMakeAllocationDecisionDataFetched() { + final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1"); + + Set shards = new HashSet<>(); + ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); + shards.add(shard); + batchAllocator.addData(node1, "allocId1", true, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName())); + HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); + // verify we get decisions for all the shards + assertEquals(shards.size(), allDecisions.size()); + assertEquals(shards, allDecisions.keySet()); + assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); + } + + public void testMakeAllocationDecisionDataFetchedMultipleShards() { + setUpShards(2); + final RoutingAllocation allocation = routingAllocationWithMultiplePrimaries( + noAllocationDeciders(), + CLUSTER_RECOVERED, + 2, + 0, + "allocId-0", + "allocId-1" + ); + Set shards = new HashSet<>(); + for (ShardId shardId : shardsInBatch) { + ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); + allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard().recoverySource(); + shards.add(shard); + batchAllocator.addShardData( + node1, + "allocId-" + shardId.id(), + shardId, + true, + new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()), + null + ); + } + HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger); + // verify we get decisions for all the shards + assertEquals(shards.size(), allDecisions.size()); + assertEquals(shards, allDecisions.keySet()); + for (ShardRouting shard : shards) { + assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); + } + } + + private RoutingAllocation routingAllocationWithOnePrimary( + AllocationDeciders deciders, + UnassignedInfo.Reason reason, + String... activeAllocationIds + ) { + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)) + ) + .build(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + switch (reason) { + + case INDEX_CREATED: + routingTableBuilder.addAsNew(metadata.index(shardId.getIndex())); + break; + case CLUSTER_RECOVERED: + routingTableBuilder.addAsRecovery(metadata.index(shardId.getIndex())); + break; + case INDEX_REOPENED: + routingTableBuilder.addAsFromCloseToOpen(metadata.index(shardId.getIndex())); + break; + default: + throw new IllegalArgumentException("can't do " + reason + " for you. teach me"); + } + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTableBuilder.build()) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); + } + + private RoutingAllocation routingAllocationWithMultiplePrimaries( + AllocationDeciders deciders, + UnassignedInfo.Reason reason, + int numberOfShards, + int replicas, + String... activeAllocationIds + ) { + Iterator shardIterator = shardsInBatch.iterator(); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(replicas) + .putInSyncAllocationIds(shardIterator.next().id(), Sets.newHashSet(activeAllocationIds[0])) + .putInSyncAllocationIds(shardIterator.next().id(), Sets.newHashSet(activeAllocationIds[1])) + ) + .build(); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for (ShardId shardIdFromBatch : shardsInBatch) { + switch (reason) { + case INDEX_CREATED: + routingTableBuilder.addAsNew(metadata.index(shardIdFromBatch.getIndex())); + break; + case CLUSTER_RECOVERED: + routingTableBuilder.addAsRecovery(metadata.index(shardIdFromBatch.getIndex())); + break; + case INDEX_REOPENED: + routingTableBuilder.addAsFromCloseToOpen(metadata.index(shardIdFromBatch.getIndex())); + break; + default: + throw new IllegalArgumentException("can't do " + reason + " for you. teach me"); + } + } + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTableBuilder.build()) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); + } + + class TestBatchAllocator extends PrimaryShardBatchAllocator { + + private Map data; + + public TestBatchAllocator clear() { + data = null; + return this; + } + + public TestBatchAllocator addData( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint + ) { + return addData(node, allocationId, primary, replicationCheckpoint, null); + } + + public TestBatchAllocator addData(DiscoveryNode node, String allocationId, boolean primary) { + Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings); + return addData( + node, + allocationId, + primary, + ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()), + null + ); + } + + public TestBatchAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) { + Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings); + return addData( + node, + allocationId, + primary, + ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()), + storeException + ); + } + + public TestBatchAllocator addData( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + @Nullable Exception storeException + ) { + if (data == null) { + data = new HashMap<>(); + } + Map shardData = Map.of( + shardId, + new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards( + allocationId, + primary, + replicationCheckpoint, + storeException + ) + ); + data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); + return this; + } + + public TestBatchAllocator addShardData( + DiscoveryNode node, + String allocationId, + ShardId shardId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + @Nullable Exception storeException + ) { + if (data == null) { + data = new HashMap<>(); + } + Map shardData = new HashMap<>(); + shardData.put( + shardId, + new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards( + allocationId, + primary, + replicationCheckpoint, + storeException + ) + ); + if (data.get(node) != null) shardData.putAll(data.get(node).getNodeGatewayStartedShardsBatch()); + data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); + return this; + } + + @Override + protected AsyncShardFetch.FetchResult fetchData( + Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation + ) { + return new AsyncShardFetch.FetchResult<>(data, Collections.>emptyMap()); + } + } +}