Skip to content

Commit

Permalink
add test to valiate request state persistence
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
Anshu Agarwal committed Jan 9, 2023
1 parent 7ca22c5 commit f376eab
Showing 1 changed file with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,4 +698,74 @@ public void testWeightedRoutingInMemoryStore() {
terminate(threadPool);
}
}

/**
* Test to validate that shard routing state is persistent across requests
*/
public void testWeightedRoutingShardState() {
TestThreadPool threadPool = null;
try {
Settings.Builder settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.awareness.attributes", "zone");
AllocationService strategy = createAllocationService(settings.build());

Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.build();

threadPool = new TestThreadPool("testThatOnlyNodesSupport");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);

Map<String, String> node1Attributes = new HashMap<>();
node1Attributes.put("zone", "zone1");
Map<String, String> node2Attributes = new HashMap<>();
node2Attributes.put("zone", "zone2");
Map<String, String> node3Attributes = new HashMap<>();
node3Attributes.put("zone", "zone3");
clusterState = ClusterState.builder(clusterState)
.nodes(
DiscoveryNodes.builder()
.add(newNode("node1", unmodifiableMap(node1Attributes)))
.add(newNode("node2", unmodifiableMap(node2Attributes)))
.add(newNode("node3", unmodifiableMap(node3Attributes)))
.localNodeId("node1")
)
.build();
clusterState = strategy.reroute(clusterState, "reroute");

clusterState = startInitializingShardsAndReroute(strategy, clusterState);
clusterState = startInitializingShardsAndReroute(strategy, clusterState);

Map<String, Double> weights = Map.of("zone1", 3.0, "zone2", 2.0, "zone3", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

Map<String, Integer> requestCount = new HashMap<>();

for (int i = 0; i < 5; i++) {
ShardIterator shardIterator = clusterState.routingTable()
.index("test")
.shard(0)
.activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true);

assertEquals(3, shardIterator.size());
ShardRouting shardRouting;
shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
requestCount.put(shardRouting.currentNodeId(), requestCount.getOrDefault(shardRouting.currentNodeId(), 0) + 1);
}
assertEquals(3, requestCount.get("node1").intValue());
assertEquals(2, requestCount.get("node2").intValue());

} finally {
terminate(threadPool);
}
}
}

0 comments on commit f376eab

Please sign in to comment.