Skip to content

Commit

Permalink
Fix unassigned batch allocation (opensearch-project#13748)
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Jun 12, 2024
1 parent 2e13e9c commit 48f3402
Show file tree
Hide file tree
Showing 6 changed files with 505 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
Expand All @@ -55,7 +56,9 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -98,15 +101,18 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
import static org.opensearch.cluster.health.ClusterHealthStatus.GREEN;
import static org.opensearch.cluster.health.ClusterHealthStatus.RED;
import static org.opensearch.cluster.health.ClusterHealthStatus.YELLOW;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.corruptShard;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.getDiscoveryNodes;
Expand Down Expand Up @@ -753,6 +759,7 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.build()
);
Expand Down Expand Up @@ -843,6 +850,87 @@ public void testBatchModeDisabled() throws Exception {
ensureGreen("test");
}

public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentNodeStartTimeInBatchMode() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
internalCluster().startDataOnlyNodes(6);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
.build()
);
ensureGreen("test");

List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNode0DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
Settings replicaNode1DataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(1)));

ensureStableCluster(5);

logger.info("--> explicitly triggering reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

ClusterHealthResponse health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(YELLOW, health.getStatus());
assertEquals(2, health.getUnassignedShards());
// shard should be unassigned because of Allocation_Delayed
ClusterAllocationExplainResponse allocationExplainResponse = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get();
assertEquals(
AllocationDecision.ALLOCATION_DELAYED,
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
);

logger.info("--> restarting the node 1");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNode0DataPathSettings).build()
);
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());
ensureStableCluster(6);
waitUntil(
() -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getInitializingShards() == 0
);

health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(YELLOW, health.getStatus());
assertEquals(1, health.getUnassignedShards());
assertEquals(1, health.getDelayedUnassignedShards());
allocationExplainResponse = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get();
assertEquals(
AllocationDecision.ALLOCATION_DELAYED,
allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision()
);

logger.info("--> restarting the node 0");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(1)).put(replicaNode1DataPathSettings).build()
);
ensureStableCluster(7);
ensureGreen("test");
}

public void testNBatchesCreationAndAssignment() throws Exception {
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
// Total number of primary shards = 50 (50 indices*1)
Expand Down Expand Up @@ -1293,4 +1381,14 @@ private void prepareIndex(String indexName, int numberOfPrimaryShards) {
index(indexName, "type", "1", Collections.emptyMap());
flush(indexName);
}

private List<String> findNodesWithShard(final boolean primary) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED);
List<ShardRouting> requiredStartedShards = startedShards.stream()
.filter(startedShard -> startedShard.primary() == primary)
.collect(Collectors.toList());
Collections.shuffle(requiredStartedShards, random());
return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -46,9 +45,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -81,38 +78,7 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
+ "some shards";
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
}
} else {
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
break;
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
throw e;
}
}
}

private void executeDecision(
protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
Expand All @@ -135,8 +101,6 @@ private void executeDecision(
}
}

public void allocateUnassignedBatch(String batchId, RoutingAllocation allocation) {}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -165,21 +129,6 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> unassignedShardBatch,
RoutingAllocation allocation,
Logger logger
) {

return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
.collect(
Collectors.toMap(
unassignedShard -> unassignedShard,
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)
)
);
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
Expand Down Expand Up @@ -61,50 +62,59 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
AllocateUnassignedDecision decision = getInEligibleShardDecision(unassignedShard, allocation);
if (decision != null) {
return decision;
}
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(
List.of(unassignedShard),
Collections.emptyList(),
allocation
);
List<NodeGatewayStartedShard> nodeGatewayStartedShards = adaptToNodeShardStates(unassignedShard, shardsState);
return getAllocationDecision(unassignedShard, allocation, nodeGatewayStartedShards, logger);
}

/**
* Build allocation decisions for all the shards present in the batch identified by batchId.
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
*
* @param shards set of shards given for allocation
* @param allocation current allocation of all the shards
* @param logger logger used for logging
* @return shard to allocation decision map
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
HashMap<ShardId, AllocateUnassignedDecision> ineligibleShardAllocationDecisions = new HashMap<>();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> inEligibleShards = new ArrayList<>();
// identify ineligible shards
for (ShardRouting shard : shards) {
for (ShardRouting shard : shardRoutings) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
if (decision != null) {
ineligibleShardAllocationDecisions.put(shard.shardId(), decision);
inEligibleShards.add(shard);
shardAllocationDecisions.put(shard, decision);
} else {
eligibleShards.add(shard);
}
}
// Do not call fetchData if there are no eligible shards
if (eligibleShards.isEmpty()) {
return shardAllocationDecisions;
}

// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

// process the received data
for (ShardRouting unassignedShard : eligibleShards) {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;

if (shardRoutings.contains(unassignedShard)) {
assert unassignedShard.primary();
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());
} else {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
return shardAllocationDecisions;
}

/**
Expand Down
Loading

0 comments on commit 48f3402

Please sign in to comment.