Skip to content

Commit

Permalink
rename class to older style
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 18, 2024
1 parent f702715 commit d372c54
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayShardStarted;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyProvider;
Expand Down Expand Up @@ -765,11 +765,11 @@ public void testSingleShardFetchUsingBatchAction() {
);
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
GatewayShardStarted gatewayShardStarted = response.getNodesMap()
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
.get(searchShardsResponse.getNodes()[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(gatewayShardStarted);
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
}

public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
Expand All @@ -793,8 +793,8 @@ public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
ShardId shardId = clusterSearchShardsGroup.getShardId();
assertEquals(1, clusterSearchShardsGroup.getShards().length);
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
GatewayShardStarted gatewayShardStarted = response.getNodesMap().get(nodeId).getNodeGatewayStartedShardsBatch().get(shardId);
assertNodeGatewayStartedShardsHappyCase(gatewayShardStarted);
GatewayStartedShard gatewayStartedShard = response.getNodesMap().get(nodeId).getNodeGatewayStartedShardsBatch().get(shardId);
assertNodeGatewayStartedShardsHappyCase(gatewayStartedShard);
}
}

Expand All @@ -814,13 +814,13 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap)
);
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
GatewayShardStarted gatewayShardStarted = response.getNodesMap()
GatewayStartedShard gatewayStartedShard = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNotNull(gatewayShardStarted.storeException());
assertNotNull(gatewayShardStarted.allocationId());
assertTrue(gatewayShardStarted.primary());
assertNotNull(gatewayStartedShard.storeException());
assertNotNull(gatewayStartedShard.allocationId());
assertTrue(gatewayStartedShard.primary());
}

public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -948,10 +948,10 @@ private void assertNodeStoreFilesMetadataSuccessCase(
assertNotNull(storeFileMetadata.peerRecoveryRetentionLeases());
}

private void assertNodeGatewayStartedShardsHappyCase(GatewayShardStarted gatewayShardStarted) {
assertNull(gatewayShardStarted.storeException());
assertNotNull(gatewayShardStarted.allocationId());
assertTrue(gatewayShardStarted.primary());
private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) {
assertNull(gatewayStartedShard.storeException());
assertNotNull(gatewayStartedShard.allocationId());
assertTrue(gatewayStartedShard.primary());
}

private void prepareIndex(String indexName, int numberOfPrimaryShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision.Type;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayShardStarted;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;

import java.util.ArrayList;
Expand Down Expand Up @@ -126,22 +126,22 @@ public AllocateUnassignedDecision makeAllocationDecision(
return decision;
}
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
List<NodeGatewayShardStarted> nodeShardStates = adaptToNodeStartedShardList(shardState);
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeStartedShardList(shardState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}

/**
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayShardStarted}
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShard}
* Returns null if {@link FetchResult} does not have any data.
*/
private static List<NodeGatewayShardStarted> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
private static List<NodeGatewayStartedShard> adaptToNodeStartedShardList(FetchResult<NodeGatewayStartedShards> shardsState) {
if (!shardsState.hasData()) {
return null;
}
List<NodeGatewayShardStarted> nodeShardStates = new ArrayList<>();
List<NodeGatewayStartedShard> nodeShardStates = new ArrayList<>();
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
nodeShardStates.add(
new NodeGatewayShardStarted(
new NodeGatewayStartedShard(
nodeGatewayStartedShard.getGatewayShardStarted().allocationId(),
nodeGatewayStartedShard.getGatewayShardStarted().primary(),
nodeGatewayStartedShard.getGatewayShardStarted().replicationCheckpoint(),
Expand All @@ -156,7 +156,7 @@ private static List<NodeGatewayShardStarted> adaptToNodeStartedShardList(FetchRe
protected AllocateUnassignedDecision getAllocationDecision(
ShardRouting unassignedShard,
RoutingAllocation allocation,
List<TransportNodesGatewayStartedShardHelper.NodeGatewayShardStarted> shardState,
List<NodeGatewayStartedShard> shardState,
Logger logger
) {
final boolean explain = allocation.debugDecision();
Expand Down Expand Up @@ -247,7 +247,7 @@ protected AllocateUnassignedDecision getAllocationDecision(
nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
final NodeGatewayShardStarted nodeShardState = decidedNode.nodeShardState;
final NodeGatewayStartedShard nodeShardState = decidedNode.nodeShardState;
logger.debug(
"[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
unassignedShard.index(),
Expand Down Expand Up @@ -307,11 +307,11 @@ protected AllocateUnassignedDecision getAllocationDecision(
*/
private static List<NodeAllocationResult> buildNodeDecisions(
NodesToAllocate nodesToAllocate,
List<TransportNodesGatewayStartedShardHelper.NodeGatewayShardStarted> fetchedShardData,
List<NodeGatewayStartedShard> fetchedShardData,
Set<String> inSyncAllocationIds
) {
List<NodeAllocationResult> nodeResults = new ArrayList<>();
Collection<NodeGatewayShardStarted> ineligibleShards = new ArrayList<>();
Collection<NodeGatewayStartedShard> ineligibleShards = new ArrayList<>();
if (nodesToAllocate != null) {
final Set<DiscoveryNode> discoNodes = new HashSet<>();
nodeResults.addAll(
Expand Down Expand Up @@ -345,21 +345,21 @@ private static List<NodeAllocationResult> buildNodeDecisions(
return nodeResults;
}

private static ShardStoreInfo shardStoreInfo(NodeGatewayShardStarted nodeShardState, Set<String> inSyncAllocationIds) {
private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShard nodeShardState, Set<String> inSyncAllocationIds) {
final Exception storeErr = nodeShardState.storeException();
final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId());
return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr);
}

private static final Comparator<NodeGatewayShardStarted> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
(NodeGatewayShardStarted state) -> state.storeException() == null
private static final Comparator<NodeGatewayStartedShard> NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing(
(NodeGatewayStartedShard state) -> state.storeException() == null
).reversed();
private static final Comparator<NodeGatewayShardStarted> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayShardStarted::primary
private static final Comparator<NodeGatewayStartedShard> PRIMARY_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShard::primary
).reversed();

private static final Comparator<NodeGatewayShardStarted> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayShardStarted::replicationCheckpoint,
private static final Comparator<NodeGatewayStartedShard> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShard::replicationCheckpoint,
Comparator.nullsLast(Comparator.naturalOrder())
);

Expand All @@ -373,12 +373,12 @@ protected static NodeShardsResult buildNodeShardsResult(
boolean matchAnyShard,
Set<String> ignoreNodes,
Set<String> inSyncAllocationIds,
List<NodeGatewayShardStarted> shardState,
List<NodeGatewayStartedShard> shardState,
Logger logger
) {
List<NodeGatewayShardStarted> nodeShardStates = new ArrayList<>();
List<NodeGatewayStartedShard> nodeShardStates = new ArrayList<>();
int numberOfAllocationsFound = 0;
for (NodeGatewayShardStarted nodeShardState : shardState) {
for (NodeGatewayStartedShard nodeShardState : shardState) {
DiscoveryNode node = nodeShardState.getNode();
String allocationId = nodeShardState.allocationId();

Expand Down Expand Up @@ -443,18 +443,18 @@ protected static NodeShardsResult buildNodeShardsResult(
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
}

private static Comparator<NodeGatewayShardStarted> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
private static Comparator<NodeGatewayStartedShard> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
/**
* Orders the active shards copies based on below comparators
* 1. No store exception i.e. shard copy is readable
* 2. Prefer previous primary shard
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
*/
final Comparator<NodeGatewayShardStarted> comparator; // allocation preference
final Comparator<NodeGatewayStartedShard> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeGatewayShardStarted> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayShardStarted state) -> inSyncAllocationIds.contains(state.allocationId())
Comparator<NodeGatewayStartedShard> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShard state) -> inSyncAllocationIds.contains(state.allocationId())
).reversed();
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
.thenComparing(PRIMARY_FIRST_COMPARATOR)
Expand All @@ -472,14 +472,14 @@ private static Comparator<NodeGatewayShardStarted> createActiveShardComparator(b
*/
private static NodesToAllocate buildNodesToAllocate(
RoutingAllocation allocation,
List<NodeGatewayShardStarted> nodeShardStates,
List<NodeGatewayStartedShard> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate
) {
List<DecidedNode> yesNodeShards = new ArrayList<>();
List<DecidedNode> throttledNodeShards = new ArrayList<>();
List<DecidedNode> noNodeShards = new ArrayList<>();
for (NodeGatewayShardStarted nodeShardState : nodeShardStates) {
for (NodeGatewayStartedShard nodeShardState : nodeShardStates) {
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
if (node == null) {
continue;
Expand Down Expand Up @@ -510,10 +510,10 @@ private static NodesToAllocate buildNodesToAllocate(
* This class encapsulates the result of a call to {@link #buildNodeShardsResult}
*/
static class NodeShardsResult {
final List<NodeGatewayShardStarted> orderedAllocationCandidates;
final List<NodeGatewayStartedShard> orderedAllocationCandidates;
final int allocationsFound;

NodeShardsResult(List<NodeGatewayShardStarted> orderedAllocationCandidates, int allocationsFound) {
NodeShardsResult(List<NodeGatewayStartedShard> orderedAllocationCandidates, int allocationsFound) {
this.orderedAllocationCandidates = orderedAllocationCandidates;
this.allocationsFound = allocationsFound;
}
Expand All @@ -539,10 +539,10 @@ protected static class NodesToAllocate {
* by the allocator for allocating to the node that holds the shard copy.
*/
private static class DecidedNode {
final NodeGatewayShardStarted nodeShardState;
final NodeGatewayStartedShard nodeShardState;
final Decision decision;

private DecidedNode(NodeGatewayShardStarted nodeShardState, Decision decision) {
private DecidedNode(NodeGatewayStartedShard nodeShardState, Decision decision) {
this.nodeShardState = nodeShardState;
this.decision = decision;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayShardStarted;
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;

import java.util.ArrayList;
Expand Down Expand Up @@ -99,7 +99,7 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(

// process the received data
for (ShardRouting unassignedShard : eligibleShards) {
List<NodeGatewayShardStarted> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
// get allocation decision for this shard
shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger));
}
Expand All @@ -120,23 +120,23 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
* @param shardsState fetch data result for the whole batch
* @return shard state returned from each node
*/
private static List<NodeGatewayShardStarted> adaptToNodeShardStates(
private static List<NodeGatewayStartedShard> adaptToNodeShardStates(
ShardRouting unassignedShard,
FetchResult<NodeGatewayStartedShardsBatch> shardsState
) {
if (!shardsState.hasData()) {
return null;
}
List<NodeGatewayShardStarted> nodeShardStates = new ArrayList<>();
List<NodeGatewayStartedShard> nodeShardStates = new ArrayList<>();
Map<DiscoveryNode, NodeGatewayStartedShardsBatch> nodeResponses = shardsState.getData();

// build data for a shard from all the nodes
nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> {
TransportNodesGatewayStartedShardHelper.GatewayShardStarted shardData = nodeGatewayStartedShardsBatch
TransportNodesGatewayStartedShardHelper.GatewayStartedShard shardData = nodeGatewayStartedShardsBatch
.getNodeGatewayStartedShardsBatch()
.get(unassignedShard.shardId());
nodeShardStates.add(
new NodeGatewayShardStarted(
new NodeGatewayStartedShard(
shardData.allocationId(),
shardData.primary(),
shardData.replicationCheckpoint(),
Expand Down
Loading

0 comments on commit d372c54

Please sign in to comment.