Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards #8916

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayShardStarted;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyProvider;
Expand Down Expand Up @@ -720,11 +721,11 @@ public Settings onNodeStopped(String nodeName) throws Exception {
);

assertThat(response.getNodes(), hasSize(1));
assertThat(response.getNodes().get(0).allocationId(), notNullValue());
assertThat(response.getNodes().get(0).getGatewayShardStarted().allocationId(), notNullValue());
if (corrupt) {
assertThat(response.getNodes().get(0).storeException(), notNullValue());
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), notNullValue());
} else {
assertThat(response.getNodes().get(0).storeException(), nullValue());
assertThat(response.getNodes().get(0).getGatewayShardStarted().storeException(), nullValue());
}

// start another node so cluster consistency checks won't time out due to the lack of state
Expand Down Expand Up @@ -764,11 +765,11 @@ public void testSingleShardFetchUsingBatchAction() {
);
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
GatewayShardStarted gatewayShardStarted = response.getNodesMap()
.get(searchShardsResponse.getNodes()[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
assertNodeGatewayStartedShardsHappyCase(gatewayShardStarted);
}

public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
Expand All @@ -792,11 +793,8 @@ public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
ShardId shardId = clusterSearchShardsGroup.getShardId();
assertEquals(1, clusterSearchShardsGroup.getShards().length);
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
.get(nodeId)
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
GatewayShardStarted gatewayShardStarted = response.getNodesMap().get(nodeId).getNodeGatewayStartedShardsBatch().get(shardId);
assertNodeGatewayStartedShardsHappyCase(gatewayShardStarted);
}
}

Expand All @@ -816,13 +814,13 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap)
);
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
GatewayShardStarted gatewayShardStarted = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNotNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
assertNotNull(gatewayShardStarted.storeException());
assertNotNull(gatewayShardStarted.allocationId());
assertTrue(gatewayShardStarted.primary());
}

public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -950,12 +948,10 @@ private void assertNodeStoreFilesMetadataSuccessCase(
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
}

private void assertNodeGatewayStartedShardsHappyCase(
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards
) {
assertNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
private void assertNodeGatewayStartedShardsHappyCase(GatewayShardStarted gatewayShardStarted) {
assertNull(gatewayShardStarted.storeException());
assertNotNull(gatewayShardStarted.allocationId());
assertTrue(gatewayShardStarted.primary());
}

private void prepareIndex(String indexName, int numberOfPrimaryShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@
storeStatuses.add(
new IndicesShardStoresResponse.StoreStatus(
response.getNode(),
response.allocationId(),
response.getGatewayShardStarted().allocationId(),

Check warning on line 261 in server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java#L261

Added line #L261 was not covered by tests
allocationStatus,
response.storeException()
response.getGatewayShardStarted().storeException()

Check warning on line 263 in server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java#L263

Added line #L263 was not covered by tests
)
);
}
Expand Down Expand Up @@ -308,7 +308,8 @@
* A shard exists/existed in a node only if shard state file exists in the node
*/
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
return response.storeException() != null || response.allocationId() != null;
return response.getGatewayShardStarted().storeException() != null
|| response.getGatewayShardStarted().allocationId() != null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision.Type;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayShardStarted;
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;

import java.util.ArrayList;
Expand Down Expand Up @@ -125,27 +126,37 @@
return decision;
}
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
List<NodeGatewayStartedShards> nodeShardStates = adaptToNodeStartedShardList(shardState);
List<NodeGatewayShardStarted> nodeShardStates = adaptToNodeStartedShardList(shardState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}

/**
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards}
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayShardStarted}
* Returns null if {@link FetchResult} does not have any data.
*/
private static List<NodeGatewayStartedShards> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
private static List<NodeGatewayShardStarted> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
if (!shardsState.hasData()) {
return null;
}
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add(nodeGatewayStartedShard); });
List<NodeGatewayShardStarted> nodeShardStates = new ArrayList<>();
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
nodeShardStates.add(
new NodeGatewayShardStarted(
nodeGatewayStartedShard.getGatewayShardStarted().allocationId(),
nodeGatewayStartedShard.getGatewayShardStarted().primary(),
nodeGatewayStartedShard.getGatewayShardStarted().replicationCheckpoint(),
nodeGatewayStartedShard.getGatewayShardStarted().storeException(),
node
)
);
});
return nodeShardStates;
}

protected AllocateUnassignedDecision getAllocationDecision(
ShardRouting unassignedShard,
RoutingAllocation allocation,
List<NodeGatewayStartedShards> shardState,
List<TransportNodesGatewayStartedShardHelper.NodeGatewayShardStarted> shardState,
Logger logger
) {
final boolean explain = allocation.debugDecision();
Expand Down Expand Up @@ -236,7 +247,7 @@
nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
final NodeGatewayShardStarted nodeShardState = decidedNode.nodeShardState;
logger.debug(
"[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
unassignedShard.index(),
Expand Down Expand Up @@ -296,11 +307,11 @@
*/
private static List<NodeAllocationResult> buildNodeDecisions(
NodesToAllocate nodesToAllocate,
List<NodeGatewayStartedShards> fetchedShardData,
List<TransportNodesGatewayStartedShardHelper.NodeGatewayShardStarted> fetchedShardData,
Set<String> inSyncAllocationIds
) {
List<NodeAllocationResult> nodeResults = new ArrayList<>();
Collection<NodeGatewayStartedShards> ineligibleShards = new ArrayList<>();
Collection<NodeGatewayShardStarted> ineligibleShards = new ArrayList<>();
if (nodesToAllocate != null) {
final Set<DiscoveryNode> discoNodes = new HashSet<>();
nodeResults.addAll(
Expand Down Expand Up @@ -334,21 +345,21 @@
return nodeResults;
}

private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set<String> inSyncAllocationIds) {
private static ShardStoreInfo shardStoreInfo(NodeGatewayShardStarted nodeShardState, Set<String> inSyncAllocationIds) {
final Exception storeErr = nodeShardState.storeException();
final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId());
return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr);
}

private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
(NodeGatewayStartedShards state) -> state.storeException() == null
private static final Comparator<NodeGatewayShardStarted> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
(NodeGatewayShardStarted state) -> state.storeException() == null
).reversed();
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShards::primary
private static final Comparator<NodeGatewayShardStarted> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayShardStarted::primary
).reversed();

private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShards::replicationCheckpoint,
private static final Comparator<NodeGatewayShardStarted> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayShardStarted::replicationCheckpoint,
Comparator.nullsLast(Comparator.naturalOrder())
);

Expand All @@ -362,12 +373,12 @@
boolean matchAnyShard,
Set<String> ignoreNodes,
Set<String> inSyncAllocationIds,
List<NodeGatewayStartedShards> shardState,
List<NodeGatewayShardStarted> shardState,
Logger logger
) {
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
List<NodeGatewayShardStarted> nodeShardStates = new ArrayList<>();
int numberOfAllocationsFound = 0;
for (NodeGatewayStartedShards nodeShardState : shardState) {
for (NodeGatewayShardStarted nodeShardState : shardState) {
DiscoveryNode node = nodeShardState.getNode();
String allocationId = nodeShardState.allocationId();

Expand Down Expand Up @@ -432,21 +443,18 @@
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
}

private static Comparator<NodeGatewayStartedShards> createActiveShardComparator(
boolean matchAnyShard,
Set<String> inSyncAllocationIds
) {
private static Comparator<NodeGatewayShardStarted> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
/**
* Orders the active shards copies based on below comparators
* 1. No store exception i.e. shard copy is readable
* 2. Prefer previous primary shard
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
*/
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
final Comparator<NodeGatewayShardStarted> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
Comparator<NodeGatewayShardStarted> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayShardStarted state) -> inSyncAllocationIds.contains(state.allocationId())

Check warning on line 457 in server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java#L457

Added line #L457 was not covered by tests
).reversed();
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
.thenComparing(PRIMARY_FIRST_COMPARATOR)
Expand All @@ -464,14 +472,14 @@
*/
private static NodesToAllocate buildNodesToAllocate(
RoutingAllocation allocation,
List<NodeGatewayStartedShards> nodeShardStates,
List<NodeGatewayShardStarted> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate
) {
List<DecidedNode> yesNodeShards = new ArrayList<>();
List<DecidedNode> throttledNodeShards = new ArrayList<>();
List<DecidedNode> noNodeShards = new ArrayList<>();
for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
for (NodeGatewayShardStarted nodeShardState : nodeShardStates) {
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
if (node == null) {
continue;
Expand Down Expand Up @@ -502,10 +510,10 @@
* This class encapsulates the result of a call to {@link #buildNodeShardsResult}
*/
static class NodeShardsResult {
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
final List<NodeGatewayShardStarted> orderedAllocationCandidates;
final int allocationsFound;

NodeShardsResult(List<NodeGatewayStartedShards> orderedAllocationCandidates, int allocationsFound) {
NodeShardsResult(List<NodeGatewayShardStarted> orderedAllocationCandidates, int allocationsFound) {
this.orderedAllocationCandidates = orderedAllocationCandidates;
this.allocationsFound = allocationsFound;
}
Expand All @@ -531,10 +539,10 @@
* by the allocator for allocating to the node that holds the shard copy.
*/
private static class DecidedNode {
final NodeGatewayStartedShards nodeShardState;
final NodeGatewayShardStarted nodeShardState;
final Decision decision;

private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) {
private DecidedNode(NodeGatewayShardStarted nodeShardState, Decision decision) {
this.nodeShardState = nodeShardState;
this.decision = decision;
}
Expand Down
Loading
Loading