Skip to content

Commit

Permalink
Evaluate and execute decisions for shards in a batch together (opense…
Browse files Browse the repository at this point in the history
…arch-project#13702)

Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed May 29, 2024
1 parent d26cd46 commit ce7373d
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

int primariesInRecovery = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node.nodeId());

logger.debug( "ThrottlingAllocationDecider decision, throttle: [{}] primary recovery limit [{}],"
+ " primaries in recovery [{}] invoked for [{}] on node [{}]",
primariesInRecovery >= primariesInitialRecoveries, primariesInitialRecoveries,
primariesInRecovery, shardRouting, node.node() );
if (primariesInRecovery >= primariesInitialRecoveries) {
// TODO: Should index creation not be throttled for primary shards?
return allocation.decision(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -46,9 +45,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -81,38 +78,7 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
+ "some shards";
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
}
} else {
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
break;
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
throw e;
}
}
}

private void executeDecision(
protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
Expand All @@ -135,8 +101,6 @@ private void executeDecision(
}
}

public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -165,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> unassignedShardBatch,
RoutingAllocation allocation,
Logger logger
) {

return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
.collect(
Collectors.toMap(
unassignedShard -> unassignedShard,
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
)
);
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
Expand All @@ -24,6 +25,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is
Expand Down Expand Up @@ -61,50 +64,56 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
AllocateUnassignedDecision decision = getInEligibleShardDecision(unassignedShard, allocation);
if (decision != null) {
return decision;
}
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation);
List<NodeGatewayStartedShard> nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger);
}

/**
* Build allocation decisions for all the shards present in the batch identified by batchId.
*
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
* @param logger logger used for logging
* @return shard to allocation decision map
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> inEligibleShards = new ArrayList<>();
// identify ineligible shards
for (ShardRouting shard : shards) {
for (ShardRouting shard : shardRoutings) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
if (decision != null) {
ineligibleShardAllocationDecisions.put(shard.shardId(), decision);
inEligibleShards.add(shard);
shardAllocationDecisions.put(shard, decision);
} else {
eligibleShards.add(shard);
}
}
// Do not call fetchData if there are no eligible shards
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}

// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

// process the received data
for (ShardRouting unassignedShard : eligibleShards) {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
Set<ShardId> shardIdsFromBatch = shardRoutings.stream()
.map(shardRouting -> shardRouting.shardId())
.collect(Collectors.toSet());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;

if (shardIdsFromBatch.contains(unassignedShard.shardId())) {
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
} else {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
return shardAllocationDecisions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
Expand All @@ -29,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Allocates replica shards in a batch mode
Expand Down Expand Up @@ -98,25 +100,43 @@ protected FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadat

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
if (isResponsibleFor(unassignedShard) == false) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation);
Decision allocateDecision = result.v1();
if (allocateDecision.type() != Decision.Type.YES
&& (allocation.debugDecision() == false || hasInitiatedFetching(unassignedShard) == false)) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}

final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(List.of(unassignedShard), Collections.emptyList(), allocation);
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger);
}

@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
final boolean explain = allocation.debugDecision();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
HashMap<ShardRouting, Tuple<Decision, Map<String, NodeAllocationResult>>> nodeAllocationDecisions = new HashMap<>();
for (ShardRouting shard : shards) {
for (ShardRouting shard : shardRoutings) {
if (!isResponsibleFor(shard)) {
// this allocator n is not responsible for allocating this shard
ineligibleShards.add(shard);
shardAllocationDecisions.put(shard, AllocateUnassignedDecision.NOT_TAKEN);
ineligibleShardAllocationDecisions.put(shard.shardId(), AllocateUnassignedDecision.NOT_TAKEN);
continue;
}

Expand All @@ -126,8 +146,9 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
shardAllocationDecisions.put(
shard,
ineligibleShards.add(shard);
ineligibleShardAllocationDecisions.put(
shard.shardId(),
AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
Expand All @@ -142,27 +163,34 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
eligibleShards.add(shard);
}

// Do not call fetchData if there are no eligible shards
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

for (ShardRouting unassignedShard : eligibleShards) {
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
shardAllocationDecisions.put(
unassignedShard,
getAllocationDecision(
unassignedShard,
allocation,
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
result,
logger
)
);
List<ShardId> shardIdsFromBatch = shardRoutings.stream()
.map(shardRouting -> shardRouting.shardId())
.collect(Collectors.toList());
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();

if (shardIdsFromBatch.contains(unassignedShard.shardId())) {
logger.info("Executing allocation decision for shard {}", shardIdsFromBatch);
AllocateUnassignedDecision allocateUnassignedDecision;
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocateUnassignedDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
} else {
Tuple<Decision, Map<String, NodeAllocationResult>> result = nodeAllocationDecisions.get(unassignedShard);
allocateUnassignedDecision = getAllocationDecision(
unassignedShard,
allocation,
convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState),
result,
logger
);
}
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
}
}
return shardAllocationDecisions;
}

private Map<DiscoveryNode, StoreFilesMetadata> convertToNodeStoreFilesMetadataMap(
Expand Down
Loading

0 comments on commit ce7373d

Please sign in to comment.