Skip to content

Commit

Permalink
Use List instead of Set
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Feb 27, 2024
1 parent dc7eb43 commit e0c4943
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is
Expand All @@ -47,8 +45,8 @@
public abstract class PrimaryShardBatchAllocator extends PrimaryShardAllocator {

abstract protected FetchResult<NodeGatewayStartedShardsBatch> fetchData(
Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
List<ShardRouting> eligibleShards,
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
);

Expand All @@ -62,7 +60,7 @@ protected FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedS

@Override
public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) {
return makeAllocationDecision(new HashSet<>(Collections.singletonList(unassignedShard)), allocation, logger).get(unassignedShard);
return makeAllocationDecision(Collections.singletonList(unassignedShard), allocation, logger).get(unassignedShard);
}

/**
Expand All @@ -75,13 +73,13 @@ public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassigned
*/
@Override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
List<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> shardAllocationDecisions = new HashMap<>();
Set<ShardRouting> eligibleShards = new HashSet<>();
Set<ShardRouting> inEligibleShards = new HashSet<>();
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> inEligibleShards = new ArrayList<>();
// identify ineligible shards
for (ShardRouting shard : shards) {
AllocateUnassignedDecision decision = getInEligibleShardDecision(shard, allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.opensearch.gateway;

import org.apache.lucene.codecs.Codec;
import org.junit.Before;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand All @@ -34,7 +33,9 @@
import org.opensearch.index.codec.CodecService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.test.IndexSettingsModule;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -76,7 +77,7 @@ private void allocateAllUnassigned(final RoutingAllocation allocation) {

private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
Set<ShardRouting> shardsToBatch = new HashSet<>();
List<ShardRouting> shardsToBatch = new ArrayList<>();
while (iterator.hasNext()) {
shardsToBatch.add(iterator.next());
}
Expand All @@ -86,40 +87,40 @@ private void allocateAllUnassignedBatch(final RoutingAllocation allocation) {
public void testMakeAllocationDecisionDataFetching() {
final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1");

Set<ShardRouting> shards = new HashSet<>();
List<ShardRouting> shards = new ArrayList<>();
allocateAllUnassignedBatch(allocation);
ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
shards.add(shard);
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(shards, allDecisions.keySet());
assertEquals(shards, new ArrayList<>(allDecisions.keySet()));
assertEquals(AllocationDecision.AWAITING_INFO, allDecisions.get(shard).getAllocationDecision());
}

public void testMakeAllocationDecisionForReplicaShard() {
final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1");

List<ShardRouting> replicaShards = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).replicaShards();
Set<ShardRouting> shards = new HashSet<>(replicaShards);
List<ShardRouting> shards = new ArrayList<>(replicaShards);
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(shards, allDecisions.keySet());
assertEquals(false, allDecisions.get(replicaShards.get(0)).isDecisionTaken());
assertEquals(shards, new ArrayList<>(allDecisions.keySet()));
assertFalse(allDecisions.get(replicaShards.get(0)).isDecisionTaken());
}

public void testMakeAllocationDecisionDataFetched() {
final RoutingAllocation allocation = routingAllocationWithOnePrimary(noAllocationDeciders(), CLUSTER_RECOVERED, "allocId1");

Set<ShardRouting> shards = new HashSet<>();
List<ShardRouting> shards = new ArrayList<>();
ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
shards.add(shard);
batchAllocator.addData(node1, "allocId1", true, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(shards, allDecisions.keySet());
assertEquals(shards, new ArrayList<>(allDecisions.keySet()));
assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision());
}

Expand All @@ -133,7 +134,7 @@ public void testMakeAllocationDecisionDataFetchedMultipleShards() {
"allocId-0",
"allocId-1"
);
Set<ShardRouting> shards = new HashSet<>();
List<ShardRouting> shards = new ArrayList<>();
for (ShardId shardId : shardsInBatch) {
ShardRouting shard = allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard().recoverySource();
Expand All @@ -150,7 +151,7 @@ public void testMakeAllocationDecisionDataFetchedMultipleShards() {
HashMap<ShardRouting, AllocateUnassignedDecision> allDecisions = batchAllocator.makeAllocationDecision(shards, allocation, logger);
// verify we get decisions for all the shards
assertEquals(shards.size(), allDecisions.size());
assertEquals(shards, allDecisions.keySet());
assertEquals(new HashSet<>(shards), allDecisions.keySet());
for (ShardRouting shard : shards) {
assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision());
}
Expand Down Expand Up @@ -185,7 +186,7 @@ private RoutingAllocation routingAllocationWithOnePrimary(
default:
throw new IllegalArgumentException("can't do " + reason + " for you. teach me");
}
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTableBuilder.build())
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
Expand Down Expand Up @@ -288,9 +289,9 @@ public TestBatchAllocator addData(
if (data == null) {
data = new HashMap<>();
}
Map<ShardId, TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards> shardData = Map.of(
Map<ShardId, TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard> shardData = Map.of(
shardId,
new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards(
new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(
allocationId,
primary,
replicationCheckpoint,
Expand All @@ -312,10 +313,10 @@ public TestBatchAllocator addShardData(
if (data == null) {
data = new HashMap<>();
}
Map<ShardId, TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards> shardData = new HashMap<>();
Map<ShardId, TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard> shardData = new HashMap<>();
shardData.put(
shardId,
new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards(
new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(
allocationId,
primary,
replicationCheckpoint,
Expand All @@ -329,8 +330,8 @@ public TestBatchAllocator addShardData(

@Override
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(
Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
List<ShardRouting> shardsEligibleForFetch,
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
return new AsyncShardFetch.FetchResult<>(data, Collections.<ShardId, Set<String>>emptyMap());
Expand Down

0 comments on commit e0c4943

Please sign in to comment.