Skip to content

Commit

Permalink
Refactor PSA to use NodeGatewayStartedShards, removed NodeShardState
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Dec 11, 2023
1 parent 2c03e65 commit 6120e0f
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 151 deletions.
30 changes: 15 additions & 15 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

184 changes: 53 additions & 131 deletions server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,40 +126,41 @@ public AllocateUnassignedDecision makeAllocationDecision(
return decision;
}
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
if (shardState.hasData() == false) {
allocation.setHasPendingAsyncFetch();
List<NodeAllocationResult> nodeDecisions = null;
if (allocation.debugDecision()) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
}
List<NodeShardState> nodeShardStates = adaptToNodeShardStates(shardState);
List<NodeGatewayStartedShards> nodeShardStates = adaptToNodeShardStates(shardState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}

private static List<NodeShardState> adaptToNodeShardStates(FetchResult<NodeGatewayStartedShards> shardsState) {
List<NodeShardState> nodeShardStates = new ArrayList();
/**
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards}
* Returns null if {@link FetchResult} does not have any data.
*
* @param shardsState {@link FetchResult<NodeGatewayStartedShards>}
* */
private static List<NodeGatewayStartedShards> adaptToNodeShardStates(FetchResult<NodeGatewayStartedShards> shardsState) {
if (!shardsState.hasData()){
return null;
}
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
nodeShardStates.add(
new NodeShardState(
node,
nodeGatewayStartedShard.allocationId(),
nodeGatewayStartedShard.primary(),
nodeGatewayStartedShard.replicationCheckpoint(),
nodeGatewayStartedShard.storeException()
)
);
nodeShardStates.add(nodeGatewayStartedShard);
});
return nodeShardStates;
}

protected AllocateUnassignedDecision getAllocationDecision(
ShardRouting unassignedShard,
RoutingAllocation allocation,
List<NodeShardState> shardState,
List<NodeGatewayStartedShards> shardState,
Logger logger
) {
if (shardState == null) {
allocation.setHasPendingAsyncFetch();
List<NodeAllocationResult> nodeDecisions = null;
if (allocation.debugDecision()) {
nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
}
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
}
final boolean explain = allocation.debugDecision();
// don't create a new IndexSetting object for every shard as this could cause a lot of garbage
// on cluster restart if we allocate a boat load of shards
Expand Down Expand Up @@ -240,7 +241,7 @@ protected AllocateUnassignedDecision getAllocationDecision(
nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
final NodeShardState nodeShardState = decidedNode.nodeShardState;
final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
logger.debug(
"[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
unassignedShard.index(),
Expand Down Expand Up @@ -300,11 +301,11 @@ protected AllocateUnassignedDecision getAllocationDecision(
*/
private static List<NodeAllocationResult> buildNodeDecisions(
NodesToAllocate nodesToAllocate,
List<NodeShardState> fetchedShardData,
List<NodeGatewayStartedShards> fetchedShardData,
Set<String> inSyncAllocationIds
) {
List<NodeAllocationResult> nodeResults = new ArrayList<>();
Collection<NodeShardState> ineligibleShards = new ArrayList<>();
Collection<NodeGatewayStartedShards> ineligibleShards = new ArrayList<>();
if (nodesToAllocate != null) {
final Set<DiscoveryNode> discoNodes = new HashSet<>();
nodeResults.addAll(
Expand Down Expand Up @@ -338,21 +339,23 @@ private static List<NodeAllocationResult> buildNodeDecisions(
return nodeResults;
}

private static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Set<String> inSyncAllocationIds) {
private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set<String> inSyncAllocationIds) {
final Exception storeErr = nodeShardState.storeException();
final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId());
return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr);
}

private static final Comparator<NodeShardState> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
(NodeShardState state) -> state.storeException() == null
private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
(NodeGatewayStartedShards state) -> state.storeException() == null
).reversed();
private static final Comparator<NodeShardState> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(NodeShardState::primary).reversed();
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShards::primary).reversed();

private static final Comparator<NodeShardState> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeShardState::replicationCheckpoint,
Comparator.nullsLast(Comparator.naturalOrder())
);
private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR =
Comparator.comparing(
NodeGatewayStartedShards::replicationCheckpoint,
Comparator.nullsLast(Comparator.naturalOrder())
);

/**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
Expand All @@ -364,12 +367,12 @@ protected static NodeShardsResult buildNodeShardsResult(
boolean matchAnyShard,
Set<String> ignoreNodes,
Set<String> inSyncAllocationIds,
List<NodeShardState> shardState,
List<NodeGatewayStartedShards> shardState,
Logger logger
) {
List<NodeShardState> nodeShardStates = new ArrayList<>();
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
int numberOfAllocationsFound = 0;
for (NodeShardState nodeShardState : shardState) {
for (NodeGatewayStartedShards nodeShardState : shardState) {
DiscoveryNode node = nodeShardState.getNode();
String allocationId = nodeShardState.allocationId();

Expand Down Expand Up @@ -434,18 +437,18 @@ protected static NodeShardsResult buildNodeShardsResult(
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
}

private static Comparator<NodeShardState> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
/*
Orders the active shards copies based on below comparators
1. No store exception i.e. shard copy is readable
2. Prefer previous primary shard
3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
private static Comparator<NodeGatewayStartedShards> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
/**
* Orders the active shards copies based on below comparators
* 1. No store exception i.e. shard copy is readable
* 2. Prefer previous primary shard
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
*/
final Comparator<NodeShardState> comparator; // allocation preference
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeShardState> matchingAllocationsFirst = Comparator.comparing(
(NodeShardState state) -> inSyncAllocationIds.contains(state.allocationId())
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
).reversed();
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
.thenComparing(PRIMARY_FIRST_COMPARATOR)
Expand All @@ -463,14 +466,14 @@ private static Comparator<NodeShardState> createActiveShardComparator(boolean ma
*/
private static NodesToAllocate buildNodesToAllocate(
RoutingAllocation allocation,
List<NodeShardState> nodeShardStates,
List<NodeGatewayStartedShards> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate
) {
List<DecidedNode> yesNodeShards = new ArrayList<>();
List<DecidedNode> throttledNodeShards = new ArrayList<>();
List<DecidedNode> noNodeShards = new ArrayList<>();
for (NodeShardState nodeShardState : nodeShardStates) {
for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
if (node == null) {
continue;
Expand Down Expand Up @@ -501,10 +504,10 @@ private static NodesToAllocate buildNodesToAllocate(
* This class encapsulates the result of a call to {@link #buildNodeShardsResult}
*/
static class NodeShardsResult {
final List<NodeShardState> orderedAllocationCandidates;
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
final int allocationsFound;

NodeShardsResult(List<NodeShardState> orderedAllocationCandidates, int allocationsFound) {
NodeShardsResult(List<NodeGatewayStartedShards> orderedAllocationCandidates, int allocationsFound) {
this.orderedAllocationCandidates = orderedAllocationCandidates;
this.allocationsFound = allocationsFound;
}
Expand All @@ -530,93 +533,12 @@ protected static class NodesToAllocate {
* by the allocator for allocating to the node that holds the shard copy.
*/
private static class DecidedNode {
final NodeShardState nodeShardState;
final NodeGatewayStartedShards nodeShardState;
final Decision decision;

private DecidedNode(NodeShardState nodeShardState, Decision decision) {
private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) {
this.nodeShardState = nodeShardState;
this.decision = decision;
}
}

/**
* The NodeShardState class represents the state of a node shard in a distributed system.
* It includes several key data points about the shard state, such as its allocation ID,
* whether it's a primary shard, any store exception, the replication checkpoint, and the
* DiscoveryNode it belongs to.
*/
protected static class NodeShardState {
// Allocation ID of the shard
private final String allocationId;
// Whether the shard is primary
private final boolean primary;
// Any store exception associated with the shard
private final Exception storeException;
// The replication checkpoint of the shard
private final ReplicationCheckpoint replicationCheckpoint;
// The DiscoveryNode the shard belongs to
private final DiscoveryNode node;

/**
* Constructs a new NodeShardState with the given parameters.
* @param node The DiscoveryNode the shard belongs to.
* @param allocationId The allocation ID of the shard.
* @param primary Whether the shard is a primary shard.
* @param replicationCheckpoint The replication checkpoint of the shard.
* @param storeException Any store exception associated with the shard.
*/
public NodeShardState(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint,
Exception storeException
) {
this.node = node;
this.allocationId = allocationId;
this.primary = primary;
this.replicationCheckpoint = replicationCheckpoint;
this.storeException = storeException;
}

/**
* Returns the allocation ID of the shard.
* @return The allocation ID of the shard.
*/
public String allocationId() {
return this.allocationId;
}

/**
* Returns whether the shard is a primary shard.
* @return True if the shard is a primary shard, false otherwise.
*/
public boolean primary() {
return this.primary;
}

/**
* Returns the replication checkpoint of the shard.
* @return The replication checkpoint of the shard.
*/
public ReplicationCheckpoint replicationCheckpoint() {
return this.replicationCheckpoint;
}

/**
* Returns any store exception associated with the shard.
* @return The store exception associated with the shard, or null if there isn't one.
*/
public Exception storeException() {
return this.storeException;
}

/**
* Returns the DiscoveryNode the shard belongs to.
* @return The DiscoveryNode the shard belongs to.
*/
public DiscoveryNode getNode() {
return this.node;
}
}
}
Loading

0 comments on commit 6120e0f

Please sign in to comment.