diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index bc5c5abb5386d..1dd69f696c7ed 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -15,6 +15,7 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -32,8 +33,9 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; -import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; @@ -44,7 +46,6 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.Collection; import java.util.Arrays; import java.util.List; @@ -53,9 +54,9 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -71,7 +72,7 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return asList(MockTransportService.TestPlugin.class); } @Override @@ -110,11 +111,9 @@ public void ingestDocs(int docCount) throws Exception { indexer.start(docCount); waitForDocs(docCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -125,9 +124,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + waitForSearchableDocs(1, primary, replica); // index another doc but don't refresh, we will ensure this is searchable once replica is promoted. client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -139,6 +136,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); // assert we can index into the new primary. @@ -150,13 +148,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(4, nodeC, replica); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRestartPrimary() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -170,8 +165,7 @@ public void testRestartPrimary() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); internalCluster().restartNode(primary); ensureGreen(INDEX_NAME); @@ -179,13 +173,10 @@ public void testRestartPrimary() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. final String primary = internalCluster().startNode(); @@ -199,8 +190,7 @@ public void testCancelPrimaryAllocation() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); final IndexShard indexShard = getIndexShard(primary); client().admin() @@ -214,15 +204,13 @@ public void testCancelPrimaryAllocation() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - * + *

* TODO: Ignoring this test as its flaky and needs separate fix */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") @@ -292,7 +280,6 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); @@ -314,10 +301,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -325,12 +309,10 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } } @@ -355,12 +337,8 @@ public void testIndexReopenClose() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(initialDocCount, primary, replica); } - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - logger.info("--> Closing the index "); client().admin().indices().prepareClose(INDEX_NAME).get(); @@ -368,8 +346,8 @@ public void testIndexReopenClose() throws Exception { client().admin().indices().prepareOpen(INDEX_NAME).get(); ensureGreen(INDEX_NAME); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, primary, replica); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } public void testMultipleShards() throws Exception { @@ -400,10 +378,7 @@ public void testMultipleShards() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -411,12 +386,10 @@ public void testMultipleShards() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(3, REPLICA_COUNT); } } @@ -444,24 +417,17 @@ public void testReplicationAfterForceMerge() throws Exception { waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); // Index a second set of docs so we can merge into one segment. indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - - ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } } @@ -556,10 +522,10 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(3, primaryNode, replicaNode); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } public void testDeleteOperations() throws Exception { @@ -583,20 +549,13 @@ public void testDeleteOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); @@ -605,32 +564,18 @@ public void testDeleteOperations() throws Exception { client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertBusy(() -> { - final long nodeA_Count = client(nodeA).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeA_Count); - final long nodeB_Count = client(nodeB).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeB_Count); - }, 5, TimeUnit.SECONDS); + waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testUpdateOperations() throws Exception { - final String primary = internalCluster().startNode(); + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); try ( @@ -647,20 +592,13 @@ public void testUpdateOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, asList(primary, replica)); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, asList(primary, replica)); Set ids = indexer.getIds(); String id = ids.toArray()[0].toString(); @@ -672,69 +610,80 @@ public void testUpdateOperations() throws Exception { assertEquals(2, updateResponse.getVersion()); refresh(INDEX_NAME); - waitForReplicaUpdate(); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); - } } - private void assertSegmentStats(int numberOfReplicas) throws IOException { - final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); - - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - - // There will be an entry in the list for each index. - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - - // Separate Primary & replica shards ShardSegments. - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - - assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - - assertEquals( - "There should be a ShardSegment entry for each replica in the replicationGroup", - numberOfReplicas, - replicaShardSegments.size() - ); - - for (ShardSegments shardSegment : replicaShardSegments) { - final Map latestReplicaSegments = getLatestSegments(shardSegment); - for (Segment replicaSegment : latestReplicaSegments.values()) { - final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); - assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); - assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); - assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); - assertEquals(replicaSegment.getSize(), primarySegment.getSize()); - } + private void assertIdenticalSegments(int numberOfShards, int numberOfReplicas) throws Exception { + assertBusy(() -> { + final IndicesSegmentResponse indicesSegmentResponse = client().admin() + .indices() + .segments(new IndicesSegmentsRequest()) + .actionGet(); + List segmentsByIndex = getShardSegments(indicesSegmentResponse); - // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. - // This ensures the previous commit point is not wiped. - final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); - ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); - final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - IndexShard indexShard = getIndexShard(replicaNode.getName()); - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - indexShard.store().readLastCommittedSegmentsInfo(); + // There will be an entry in the list for each index. + assertEquals("Expected a different number of shards in the index", numberOfShards, segmentsByIndex.size()); + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegmentsList = segmentListMap.get(false); + assertEquals("There should only be one primary in the replicationGroup", 1, primaryShardSegmentsList.size()); + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegmentsList.size() + ); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final IndexShard primaryShard = getIndexShard(primaryShardSegments); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardSegments replicaShardSegments : replicaShardSegmentsList) { + final IndexShard replicaShard = getIndexShard(replicaShardSegments); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } } - } + }, 1, TimeUnit.MINUTES); + } + + private IndexShard getIndexShard(ShardSegments shardSegments) { + final ShardRouting replicaShardRouting = shardSegments.getShardRouting(); + ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); + return getIndexShard(replicaNode.getName()); } public void testDropPrimaryDuringReplication() throws Exception { + int replica_count = 6; final Settings settings = Settings.builder() .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6); + final List dataNodes = internalCluster().startDataOnlyNodes(6); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -757,50 +706,39 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(); + dataNodes.add(internalCluster().startDataOnlyNode()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. - client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + String docId = String.valueOf(initialDocCount + 1); + client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertSegmentStats(6); + waitForSearchableDocs(initialDocCount + 1, dataNodes); + assertIdenticalSegments(SHARD_COUNT, replica_count); } } /** - * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception if assertion fails + * Waits until all given nodes have at least the expected docCount. + * + * @param docCount - Expected Doc count. + * @param nodes - List of node names. */ - private void waitForReplicaUpdate() throws Exception { + private void waitForSearchableDocs(long docCount, List nodes) throws Exception { // wait until the replica has the latest segment generation. assertBusy(() -> { - final IndicesSegmentResponse indicesSegmentResponse = client().admin() - .indices() - .segments(new IndicesSegmentsRequest()) - .actionGet(); - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - // if we don't have any segments yet, proceed. - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); - for (ShardSegments shardSegments : replicaShardSegments) { - logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); - final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() - .stream() - .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); - assertTrue(isReplicaCaughtUpToPrimary); - } + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + if (response.getHits().getTotalHits().value < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount); } } - }); + }, 1, TimeUnit.MINUTES); + } + + private void waitForSearchableDocs(long docCount, String... nodes) throws Exception { + waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); } private IndexShard getIndexShard(String node) { @@ -820,15 +758,6 @@ private List getShardSegments(IndicesSegmentResponse indicesSeg .collect(Collectors.toList()); } - private Map getLatestSegments(ShardSegments segments) { - final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); - final Long latestPrimaryGen = generation.get(); - return segments.getSegments() - .stream() - .filter(s -> s.getGeneration() == latestPrimaryGen) - .collect(Collectors.toMap(Segment::getName, Function.identity())); - } - private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 96a54ab65d268..f312d23b34af9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1580,6 +1580,19 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { } } + /** + * Fetch a map of StoreFileMetadata for each segment from the latest SegmentInfos. + * This is used to compute diffs for segment replication. + * + * @return - Map of Segment Filename to its {@link StoreFileMetadata} + * @throws IOException - When there is an error loading metadata from the store. + */ + public Map getSegmentMetadataMap() throws IOException { + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return store.getSegmentMetadataMap(snapshot.get()); + } + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index aadcb577f6174..3ecdbdfd7be6e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -22,7 +22,6 @@ import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; @@ -38,8 +37,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; -import java.util.Map; /** * Represents the target of a replication event. @@ -173,8 +170,8 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getMetadataMap() throws IOException { - if (indexShard.getSegmentInfosSnapshot() == null) { - return Collections.emptyMap(); - } - try (final GatedCloseable snapshot = indexShard.getSegmentInfosSnapshot()) { - return store.getSegmentMetadataMap(snapshot.get()); - } - } - @Override protected void onCancel(String reason) { cancellableThreads.cancel(reason); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index ffea4aaf6b7c4..5900ac3ad29f3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -363,8 +363,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { @@ -415,8 +415,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) {