Skip to content

Commit

Permalink
Clean up in SegmentReplicationIT.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Aug 8, 2022
1 parent 9887564 commit 8eedb4a
Showing 1 changed file with 28 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,34 @@ protected boolean addMockInternalEngine() {
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

final DiscoveryNode primaryDiscoveryNode = getNodeContainingPrimaryShard();
final String primaryNodeName = primaryDiscoveryNode.getName();
final String replicaNodeName = nodeA.equals(primaryNodeName) ? nodeB : nodeA;
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// index another doc but don't refresh, we will ensure this is searchable once replica is promoted.
client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// stop the primary node - we only have one shard on here.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replicaNodeName);
final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// assert we can index into the new primary.
client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
Expand All @@ -115,44 +112,35 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
refresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertSegmentStats(REPLICA_COUNT);
}

private DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}

public void testRestartPrimary() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final DiscoveryNode primaryDiscoveryNode = getNodeContainingPrimaryShard();
final String primaryNodeName = primaryDiscoveryNode.getName();
final String replicaNodeName = nodeA.equals(primaryNodeName) ? nodeB : nodeA;
assertEquals(getNodeContainingPrimaryShard().getName(), primary);

final int initialDocCount = 1;

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertDocCounts(initialDocCount, replicaNodeName, primaryNodeName);
assertDocCounts(initialDocCount, replica, primary);

internalCluster().restartNode(primaryNodeName);
internalCluster().restartNode(primary);
ensureGreen(INDEX_NAME);

final DiscoveryNode newPrimaryNode = getNodeContainingPrimaryShard();
assertEquals(newPrimaryNode.getName(), replicaNodeName);
assertEquals(getNodeContainingPrimaryShard().getName(), replica);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();

assertDocCounts(initialDocCount, replicaNodeName, primaryNodeName);
assertDocCounts(initialDocCount, replica, primary);
assertSegmentStats(REPLICA_COUNT);
}

Expand Down Expand Up @@ -181,8 +169,7 @@ public void testCancelPrimaryAllocation() throws Exception {
.actionGet();
ensureGreen(INDEX_NAME);

final DiscoveryNode newPrimaryNode = getNodeContainingPrimaryShard();
assertEquals(newPrimaryNode.getName(), replica);
assertEquals(getNodeContainingPrimaryShard().getName(), replica);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
Expand Down Expand Up @@ -383,6 +370,7 @@ private void waitForReplicaUpdate() throws Exception {
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
assertFalse(primaryShardSegments.getSegments().isEmpty());
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
Expand Down Expand Up @@ -413,7 +401,8 @@ private List<ShardSegments[]> getShardSegments(IndicesSegmentResponse indicesSeg
}

private Map<String, Segment> getLatestSegments(ShardSegments segments) {
final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get();
final Optional<Long> generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare);
final Long latestPrimaryGen = generation.get();
return segments.getSegments()
.stream()
.filter(s -> s.getGeneration() == latestPrimaryGen)
Expand Down Expand Up @@ -444,4 +433,10 @@ private void assertDocCounts(int expectedDocCount, String... nodeNames) {
assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount);
}
}

private DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}
}

0 comments on commit 8eedb4a

Please sign in to comment.