Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseGatewayShardAllocator changes for Assigning the batch of shards #8776

Merged
merged 9 commits into from
Mar 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -46,6 +47,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -74,7 +77,26 @@ public void allocateUnassigned(
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

public void allocateUnassignedBatch(Set<ShardRouting> shards, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
ConcurrentMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shards, allocation, logger);
Gaurav614 marked this conversation as resolved.
Show resolved Hide resolved
// get all unassigned shards
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
while (iterator.hasNext()){
ShardRouting shard = iterator.next();
if (shards.contains(shard)) {
executeDecision(shard, decisionMap.get(shard), allocation, iterator);
}
}
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}

private void executeDecision(ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
if (allocateUnassignedDecision.isDecisionTaken() == false) {
// no decision was taken by this allocator
return;
Expand All @@ -91,7 +113,6 @@ public void allocateUnassigned(
unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
}
}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -120,6 +141,12 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public abstract ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
);

Gaurav614 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -457,6 +458,12 @@ private static NodesToAllocate buildNodesToAllocate(

protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);

@Override
// to be override
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
public ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(Set<ShardRouting> shards, RoutingAllocation allocation, Logger logger) {
return null;
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}

Gaurav614 marked this conversation as resolved.
Show resolved Hide resolved
private static class NodeShardsResult {
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
final int allocationsFound;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -495,6 +496,13 @@ private static boolean canPerformOperationBasedRecovery(

protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation);

@Override
// to be override
public ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(Set<ShardRouting> shards, RoutingAllocation allocation, Logger logger) {
return null;
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}


/**
* Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
*/
Expand Down