Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards #8916

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.gateway;

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is
amkhar marked this conversation as resolved.
Show resolved Hide resolved
* that it can allocate multiple unassigned primary shards wherein PrimaryShardAllocator can only allocate single
* unassigned shard.
* The primary shard batch allocator allocates multiple unassigned primary shards to nodes that hold
* valid copies of the unassigned primaries. It does this by iterating over all unassigned
* primary shards in the routing table and fetching shard metadata from each node in the cluster
* that holds a copy of the shard. The shard metadata from each node is compared against the
* set of valid allocation IDs and for all valid shard copies (if any), the primary shard batch allocator
* executes the allocation deciders to chose a copy to assign the primary shard to.
* <p>
* Note that the PrimaryShardBatchAllocator does *not* allocate primaries on index creation
* (see {@link org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}),
* nor does it allocate primaries when a primary shard failed and there is a valid replica
* copy that can immediately be promoted to primary, as this takes place in {@link RoutingNodes#failShard}.
*
* @opensearch.internal
*/
public abstract class PrimaryShardBatchAllocator extends PrimaryShardAllocator {

abstract protected FetchResult<NodeGatewayStartedShardsBatch> fetchData(
amkhar marked this conversation as resolved.
Show resolved Hide resolved
Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
amkhar marked this conversation as resolved.
Show resolved Hide resolved
RoutingAllocation allocation
);

protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(
ShardRouting shard,
RoutingAllocation allocation
) {
return null;
amkhar marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {

return makeAllocationDecision(new HashSet<>(Collections.singletonList(unassignedShard)), allocation, logger).get(unassignedShard);
amkhar marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Build allocation decisions for all the shards given to this allocator..
amkhar marked this conversation as resolved.
Show resolved Hide resolved
*
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
* @param logger logger used for logging
* @return shard to allocation decision map
*/
@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
Set<ShardRouting> shardsEligibleForFetch = new HashSet<>();
Set<ShardRouting> shardsNotEligibleForFetch = new HashSet<>();
// identify ineligible shards
for (ShardRouting shard : shards) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
if (decision != null) {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
shardsNotEligibleForFetch.add(shard);
shardAllocationDecisions.put(shard, decision);
} else {
shardsEligibleForFetch.add(shard);
}
}
// Do not call fetchData if there are no eligible shards
if (shardsEligibleForFetch.size() == 0) {
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(
shardsEligibleForFetch,
shardsNotEligibleForFetch,
allocation
);

// process the received data
for (ShardRouting unassignedShard : shardsEligibleForFetch) {
if (shardsState.hasData() == false) {
amkhar marked this conversation as resolved.
Show resolved Hide resolved
// if fetching is not done, add that no decision in the resultant map
allocation.setHasPendingAsyncFetch();
List<NodeAllocationResult> nodeDecisions = null;
if (explain) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
shardAllocationDecisions.put(
unassignedShard,
AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions)
);
} else {

NodeShardStates nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
}
}
return shardAllocationDecisions;
}

/**
* shardsState contain the Data, there key is DiscoveryNode but value is Map<ShardId,
* NodeGatewayStartedShardsBatch> so to get one shard level data (from all the nodes), we'll traverse the map
* and construct the nodeShardState along the way before making any allocation decision. As metadata for a
* particular shard is needed from all the discovery nodes.
*
* @param unassignedShard unassigned shard
* @param shardsState fetch data result for the whole batch
* @return shard state returned from each node
*/
private static NodeShardStates adaptToNodeShardStates(
amkhar marked this conversation as resolved.
Show resolved Hide resolved
ShardRouting unassignedShard,
FetchResult<NodeGatewayStartedShardsBatch> shardsState
) {
NodeShardStates nodeShardStates = new NodeShardStates();
Map<DiscoveryNode, NodeGatewayStartedShardsBatch> nodeResponses = shardsState.getData();

// build data for a shard from all the nodes
nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> {
NodeGatewayStartedShards shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch()
.get(unassignedShard.shardId());
nodeShardStates.getNodeShardStates()
.add(
new NodeShardState(
node,
shardData.allocationId(),
shardData.primary(),
shardData.replicationCheckpoint(),
shardData.storeException()
)
);
});
return nodeShardStates;
}
}
Loading
Loading