From 6120e0f5fc1eed93f761a207ce6ab04e1fd26f5b Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Tue, 12 Dec 2023 00:35:11 +0530 Subject: [PATCH] Refactor PSA to use NodeGatewayStartedShards, removed NodeShardState --- .idea/vcs.xml | 30 +-- .../gateway/PrimaryShardAllocator.java | 184 +++++------------- .../test/gateway/TestGatewayAllocator.java | 117 ++++++++++- 3 files changed, 180 insertions(+), 151 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 48557884a8893..b74d87df8f17b 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,20 @@ - - - + + + - + diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index e3a4191758a5e..719de2db6db95 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -126,30 +126,23 @@ public AllocateUnassignedDecision makeAllocationDecision( return decision; } final FetchResult shardState = fetchData(unassignedShard, allocation); - if (shardState.hasData() == false) { - allocation.setHasPendingAsyncFetch(); - List nodeDecisions = null; - if (allocation.debugDecision()) { - nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); - } - return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); - } - List nodeShardStates = adaptToNodeShardStates(shardState); + List nodeShardStates = adaptToNodeShardStates(shardState); return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); } - private static List adaptToNodeShardStates(FetchResult shardsState) { - List nodeShardStates = new ArrayList(); + /** + * Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards} + * Returns null if {@link FetchResult} does not have any data. + * + * @param shardsState {@link FetchResult} + * */ + private static List adaptToNodeShardStates(FetchResult shardsState) { + if (!shardsState.hasData()){ + return null; + } + List nodeShardStates = new ArrayList<>(); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { - nodeShardStates.add( - new NodeShardState( - node, - nodeGatewayStartedShard.allocationId(), - nodeGatewayStartedShard.primary(), - nodeGatewayStartedShard.replicationCheckpoint(), - nodeGatewayStartedShard.storeException() - ) - ); + nodeShardStates.add(nodeGatewayStartedShard); }); return nodeShardStates; } @@ -157,9 +150,17 @@ private static List adaptToNodeShardStates(FetchResult shardState, + List shardState, Logger logger ) { + if (shardState == null) { + allocation.setHasPendingAsyncFetch(); + List nodeDecisions = null; + if (allocation.debugDecision()) { + nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); + } + return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); + } final boolean explain = allocation.debugDecision(); // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards @@ -240,7 +241,7 @@ protected AllocateUnassignedDecision getAllocationDecision( nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true); if (nodesToAllocate.yesNodeShards.isEmpty() == false) { final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - final NodeShardState nodeShardState = decidedNode.nodeShardState; + final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on forced primary allocation", unassignedShard.index(), @@ -300,11 +301,11 @@ protected AllocateUnassignedDecision getAllocationDecision( */ private static List buildNodeDecisions( NodesToAllocate nodesToAllocate, - List fetchedShardData, + List fetchedShardData, Set inSyncAllocationIds ) { List nodeResults = new ArrayList<>(); - Collection ineligibleShards = new ArrayList<>(); + Collection ineligibleShards = new ArrayList<>(); if (nodesToAllocate != null) { final Set discoNodes = new HashSet<>(); nodeResults.addAll( @@ -338,21 +339,23 @@ private static List buildNodeDecisions( return nodeResults; } - private static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Set inSyncAllocationIds) { + private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { final Exception storeErr = nodeShardState.storeException(); final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); } - private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( - (NodeShardState state) -> state.storeException() == null + private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( + (NodeGatewayStartedShards state) -> state.storeException() == null ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing(NodeShardState::primary).reversed(); + private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( + NodeGatewayStartedShards::primary).reversed(); - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( - NodeShardState::replicationCheckpoint, - Comparator.nullsLast(Comparator.naturalOrder()) - ); + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = + Comparator.comparing( + NodeGatewayStartedShards::replicationCheckpoint, + Comparator.nullsLast(Comparator.naturalOrder()) + ); /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching @@ -364,12 +367,12 @@ protected static NodeShardsResult buildNodeShardsResult( boolean matchAnyShard, Set ignoreNodes, Set inSyncAllocationIds, - List shardState, + List shardState, Logger logger ) { - List nodeShardStates = new ArrayList<>(); + List nodeShardStates = new ArrayList<>(); int numberOfAllocationsFound = 0; - for (NodeShardState nodeShardState : shardState) { + for (NodeGatewayStartedShards nodeShardState : shardState) { DiscoveryNode node = nodeShardState.getNode(); String allocationId = nodeShardState.allocationId(); @@ -434,18 +437,18 @@ protected static NodeShardsResult buildNodeShardsResult( return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); } - private static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { - /* - Orders the active shards copies based on below comparators - 1. No store exception i.e. shard copy is readable - 2. Prefer previous primary shard - 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. + private static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { + /** + * Orders the active shards copies based on below comparators + * 1. No store exception i.e. shard copy is readable + * 2. Prefer previous primary shard + * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. */ - final Comparator comparator; // allocation preference + final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeShardState state) -> inSyncAllocationIds.contains(state.allocationId()) + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) .thenComparing(PRIMARY_FIRST_COMPARATOR) @@ -463,14 +466,14 @@ private static Comparator createActiveShardComparator(boolean ma */ private static NodesToAllocate buildNodesToAllocate( RoutingAllocation allocation, - List nodeShardStates, + List nodeShardStates, ShardRouting shardRouting, boolean forceAllocate ) { List yesNodeShards = new ArrayList<>(); List throttledNodeShards = new ArrayList<>(); List noNodeShards = new ArrayList<>(); - for (NodeShardState nodeShardState : nodeShardStates) { + for (NodeGatewayStartedShards nodeShardState : nodeShardStates) { RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; @@ -501,10 +504,10 @@ private static NodesToAllocate buildNodesToAllocate( * This class encapsulates the result of a call to {@link #buildNodeShardsResult} */ static class NodeShardsResult { - final List orderedAllocationCandidates; + final List orderedAllocationCandidates; final int allocationsFound; - NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { + NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { this.orderedAllocationCandidates = orderedAllocationCandidates; this.allocationsFound = allocationsFound; } @@ -530,93 +533,12 @@ protected static class NodesToAllocate { * by the allocator for allocating to the node that holds the shard copy. */ private static class DecidedNode { - final NodeShardState nodeShardState; + final NodeGatewayStartedShards nodeShardState; final Decision decision; - private DecidedNode(NodeShardState nodeShardState, Decision decision) { + private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) { this.nodeShardState = nodeShardState; this.decision = decision; } } - - /** - * The NodeShardState class represents the state of a node shard in a distributed system. - * It includes several key data points about the shard state, such as its allocation ID, - * whether it's a primary shard, any store exception, the replication checkpoint, and the - * DiscoveryNode it belongs to. - */ - protected static class NodeShardState { - // Allocation ID of the shard - private final String allocationId; - // Whether the shard is primary - private final boolean primary; - // Any store exception associated with the shard - private final Exception storeException; - // The replication checkpoint of the shard - private final ReplicationCheckpoint replicationCheckpoint; - // The DiscoveryNode the shard belongs to - private final DiscoveryNode node; - - /** - * Constructs a new NodeShardState with the given parameters. - * @param node The DiscoveryNode the shard belongs to. - * @param allocationId The allocation ID of the shard. - * @param primary Whether the shard is a primary shard. - * @param replicationCheckpoint The replication checkpoint of the shard. - * @param storeException Any store exception associated with the shard. - */ - public NodeShardState( - DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - Exception storeException - ) { - this.node = node; - this.allocationId = allocationId; - this.primary = primary; - this.replicationCheckpoint = replicationCheckpoint; - this.storeException = storeException; - } - - /** - * Returns the allocation ID of the shard. - * @return The allocation ID of the shard. - */ - public String allocationId() { - return this.allocationId; - } - - /** - * Returns whether the shard is a primary shard. - * @return True if the shard is a primary shard, false otherwise. - */ - public boolean primary() { - return this.primary; - } - - /** - * Returns the replication checkpoint of the shard. - * @return The replication checkpoint of the shard. - */ - public ReplicationCheckpoint replicationCheckpoint() { - return this.replicationCheckpoint; - } - - /** - * Returns any store exception associated with the shard. - * @return The store exception associated with the shard, or null if there isn't one. - */ - public Exception storeException() { - return this.storeException; - } - - /** - * Returns the DiscoveryNode the shard belongs to. - * @return The DiscoveryNode the shard belongs to. - */ - public DiscoveryNode getNode() { - return this.node; - } - } } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index 7462062a0cd46..caab04d3fc434 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -35,16 +35,21 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.FailedShard; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.AsyncShardFetch; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PrimaryShardAllocator; +import org.opensearch.gateway.PrimaryShardBatchAllocator; import org.opensearch.gateway.ReplicaShardAllocator; +import org.opensearch.gateway.ReplicaShardBatchAllocator; +import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; import java.util.Collections; import java.util.HashMap; @@ -57,13 +62,13 @@ * A gateway allocator implementation that keeps an in memory list of started shard allocation * that are used as replies to the, normally async, fetch data requests. The in memory list * is adapted when shards are started and failed. - *

+ * * Nodes leaving and joining the cluster do not change the list of shards the class tracks but * rather serves as a filter to what is returned by fetch data. Concretely - fetch data will * only return shards that were started on nodes that are currently part of the cluster. - *

+ * * For now only primary shard related data is fetched. Replica request always get an empty response. - *

+ * * * This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do * not have all the infrastructure required to use it. @@ -98,7 +103,52 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR ) ); - return new AsyncShardFetch.FetchResult<>(shardId, foundShards, ignoreNodes); + return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>() { + { + put(shardId, ignoreNodes); + } + }); + } + }; + + PrimaryShardBatchAllocator primaryBatchShardAllocator = new PrimaryShardBatchAllocator() { + @Override + protected AsyncShardFetch.FetchResult fetchData( + Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation + ) { + Map foundShards = new HashMap<>(); + HashMap> shardsToIgnoreNodes = new HashMap<>(); + for (Map.Entry> entry : knownAllocations.entrySet()) { + String nodeId = entry.getKey(); + Map shardsOnNode = entry.getValue(); + HashMap adaptedResponse = new HashMap<>(); + + for (ShardRouting shardRouting : shardsEligibleForFetch) { + ShardId shardId = shardRouting.shardId(); + Set ignoreNodes = allocation.getIgnoreNodes(shardId); + + if (shardsOnNode.containsKey(shardId) && ignoreNodes.contains(nodeId) == false && currentNodes.nodeExists(nodeId)) { + TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard nodeShard = + new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard( + shardRouting.allocationId().getId(), + shardRouting.primary(), + getReplicationCheckpoint(shardId, nodeId) + ); + adaptedResponse.put(shardId, nodeShard); + shardsToIgnoreNodes.put(shardId, ignoreNodes); + } + foundShards.put( + currentNodes.get(nodeId), + new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch( + currentNodes.get(nodeId), + adaptedResponse + ) + ); + } + } + return new AsyncShardFetch.FetchResult<>(foundShards, shardsToIgnoreNodes); } }; @@ -111,7 +161,28 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // for now, just pretend no node has data final ShardId shardId = shard.shardId(); - return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), allocation.getIgnoreNodes(shardId)); + return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>() { + { + put(shardId, allocation.getIgnoreNodes(shardId)); + } + }); + } + + @Override + protected boolean hasInitiatedFetching(ShardRouting shard) { + return true; + } + }; + + ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() { + + @Override + protected AsyncShardFetch.FetchResult fetchData( + Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation + ) { + return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), Collections.emptyMap()); } @Override @@ -157,6 +228,12 @@ public void allocateUnassigned( innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } + @Override + public void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) { + currentNodes = allocation.nodes(); + innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary); + } + /** * manually add a specific shard to the allocations the gateway keeps track of */ @@ -171,4 +248,34 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) { shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint); } + + public Set createAndUpdateBatches(RoutingAllocation allocation, boolean primary) { + return super.createAndUpdateBatches(allocation, primary); + } + + public void safelyRemoveShardFromBatch(ShardRouting shard) { + super.safelyRemoveShardFromBatch(shard); + } + + public void safelyRemoveShardFromBothBatch(ShardRouting shardRouting) { + super.safelyRemoveShardFromBothBatch(shardRouting); + } + + public String getBatchId(ShardRouting shard, boolean primary) { + return super.getBatchId(shard, primary); + } + + public Map getBatchIdToStartedShardBatch() { + return batchIdToStartedShardBatch; + } + + public Map getBatchIdToStoreShardBatch() { + return batchIdToStoreShardBatch; + } + + @Override + public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) { + setShardAllocators(primaryShardAllocator, replicaShardAllocator, primaryBatchShardAllocator, replicaBatchShardAllocator); + return super.explainUnassignedShardAllocation(unassignedShard, routingAllocation); + } }