diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index ad66ccca5c042..3ae172ae7ad4b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -22,6 +22,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -149,7 +151,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); waitForSearchableDocs(4, nodeC, replica); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } public void testRestartPrimary() throws Exception { @@ -174,7 +176,7 @@ public void testRestartPrimary() throws Exception { flushAndRefresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, replica, primary); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } public void testCancelPrimaryAllocation() throws Exception { @@ -192,7 +194,7 @@ public void testCancelPrimaryAllocation() throws Exception { waitForSearchableDocs(initialDocCount, replica, primary); - final IndexShard indexShard = getIndexShard(primary); + final IndexShard indexShard = getIndexShard(primary, INDEX_NAME); client().admin() .cluster() .prepareReroute() @@ -205,7 +207,7 @@ public void testCancelPrimaryAllocation() throws Exception { flushAndRefresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, replica, primary); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } /** @@ -213,7 +215,7 @@ public void testCancelPrimaryAllocation() throws Exception { *

* TODO: Ignoring this test as its flaky and needs separate fix */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") +// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); final String primaryNode = internalCluster().startNode(); @@ -221,7 +223,10 @@ public void testAddNewReplicaFailure() throws Exception { logger.info("--> creating test index ..."); prepareCreate( INDEX_NAME, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); logger.info("--> index 10 docs"); @@ -312,7 +317,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } } @@ -347,7 +352,7 @@ public void testIndexReopenClose() throws Exception { ensureGreen(INDEX_NAME); waitForSearchableDocs(initialDocCount, primary, replica); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } public void testMultipleShards() throws Exception { @@ -389,7 +394,7 @@ public void testMultipleShards() throws Exception { waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertIdenticalSegments(3, REPLICA_COUNT); + verifyStoreContent(); } } @@ -427,7 +432,7 @@ public void testReplicationAfterForceMerge() throws Exception { // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); refresh(INDEX_NAME); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } } @@ -442,7 +447,7 @@ public void testCancellation() throws Exception { SegmentReplicationSourceService.class, primaryNode ); - final IndexShard primaryShard = getIndexShard(primaryNode); + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); CountDownLatch latch = new CountDownLatch(1); @@ -525,7 +530,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { waitForSearchableDocs(3, primaryNode, replicaNode); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } public void testDeleteOperations() throws Exception { @@ -565,7 +570,7 @@ public void testDeleteOperations() throws Exception { refresh(INDEX_NAME); waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); } } @@ -611,70 +616,14 @@ public void testUpdateOperations() throws Exception { refresh(INDEX_NAME); - assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); + verifyStoreContent(); assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); } } - private void assertIdenticalSegments(int numberOfShards, int numberOfReplicas) throws Exception { - assertBusy(() -> { - final IndicesSegmentResponse indicesSegmentResponse = client().admin() - .indices() - .segments(new IndicesSegmentsRequest()) - .actionGet(); - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - - // There will be an entry in the list for each index. - assertEquals("Expected a different number of shards in the index", numberOfShards, segmentsByIndex.size()); - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - // Separate Primary & replica shards ShardSegments. - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegmentsList = segmentListMap.get(false); - assertEquals("There should only be one primary in the replicationGroup", 1, primaryShardSegmentsList.size()); - assertEquals( - "There should be a ShardSegment entry for each replica in the replicationGroup", - numberOfReplicas, - replicaShardSegmentsList.size() - ); - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final IndexShard primaryShard = getIndexShard(primaryShardSegments); - final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); - for (ShardSegments replicaShardSegments : replicaShardSegmentsList) { - final IndexShard replicaShard = getIndexShard(replicaShardSegments); - final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( - primarySegmentMetadata, - replicaShard.getSegmentMetadataMap() - ); - if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { - fail( - "Expected no missing or different segments between primary and replica but diff was missing: " - + recoveryDiff.missing - + " Different: " - + recoveryDiff.different - + " Primary Replication Checkpoint : " - + primaryShard.getLatestReplicationCheckpoint() - + " Replica Replication Checkpoint: " - + replicaShard.getLatestReplicationCheckpoint() - ); - } - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - replicaShard.store().readLastCommittedSegmentsInfo(); - } - } - }, 1, TimeUnit.MINUTES); - } - - private IndexShard getIndexShard(ShardSegments shardSegments) { - final ShardRouting replicaShardRouting = shardSegments.getShardRouting(); - ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); - final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - return getIndexShard(replicaNode.getName()); - } - public void testDropPrimaryDuringReplication() throws Exception { - int replica_count = 6; + final int replica_count = 6; final Settings settings = Settings.builder() .put(indexSettings()) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count) @@ -715,7 +664,7 @@ public void testDropPrimaryDuringReplication() throws Exception { flushAndRefresh(INDEX_NAME); waitForSearchableDocs(initialDocCount + 1, dataNodes); - assertIdenticalSegments(SHARD_COUNT, replica_count); + verifyStoreContent(); } } @@ -742,30 +691,58 @@ private void waitForSearchableDocs(long docCount, String... nodes) throws Except waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); } - private IndexShard getIndexShard(String node) { - final Index index = resolveIndex(INDEX_NAME); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); + + private void verifyStoreContent() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = getClusterState(); + for (IndexRoutingTable indexRoutingTable: clusterState.routingTable()) { + for (IndexShardRoutingTable shardRoutingTable: indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + final String indexName = primaryRouting.getIndexName(); + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for(ShardRouting replica: replicaRouting) { + IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } + } + } + }, 1, TimeUnit.MINUTES); } - private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { - return indicesSegmentResponse.getIndices() - .values() - .stream() // get list of IndexSegments - .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group - .map(IndexShardSegments::getShards) // get list of segments across replication group - .collect(Collectors.toList()); + private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { + return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); } - private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { - return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + private IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); } @Nullable private ShardRouting getShardRoutingForNodeName(String nodeName) { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final ClusterState state = getClusterState(); for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { final String nodeId = shardRouting.currentNodeId(); @@ -784,8 +761,12 @@ private void assertDocCounts(int expectedDocCount, String... nodeNames) { } } + private ClusterState getClusterState() { + return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + } + private DiscoveryNode getNodeContainingPrimaryShard() { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final ClusterState state = getClusterState(); final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); return state.nodes().resolveNode(primaryShard.currentNodeId()); }