From 0c48ba37a22a8b8f577bf2f67ee8f7ced1f53a28 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 25 Jan 2023 14:52:02 -0800 Subject: [PATCH] Fix flaky SegmentReplicationITs. This change fixes flakiness with segment replication ITs. It does this by updating the wait condition used to ensure replicas are up to date to wait until a searched docCount is reached instead of output of the Segments API that can change if there are concurrent refreshes. It also does this by updating the method used to assert segment stats to wait until the assertion holds true rather than at a point in time. This method is also updated to assert store metadata directly over API output. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 301 +++++++----------- .../opensearch/index/shard/IndexShard.java | 13 + .../replication/SegmentReplicationTarget.java | 16 +- .../SegmentReplicationTargetTests.java | 8 +- 4 files changed, 134 insertions(+), 204 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index bc5c5abb5386d..1dd69f696c7ed 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -15,6 +15,7 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -32,8 +33,9 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; -import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; @@ -44,7 +46,6 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.Collection; import java.util.Arrays; import java.util.List; @@ -53,9 +54,9 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -71,7 +72,7 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return asList(MockTransportService.TestPlugin.class); } @Override @@ -110,11 +111,9 @@ public void ingestDocs(int docCount) throws Exception { indexer.start(docCount); waitForDocs(docCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -125,9 +124,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + waitForSearchableDocs(1, primary, replica); // index another doc but don't refresh, we will ensure this is searchable once replica is promoted. client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -139,6 +136,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); // assert we can index into the new primary. @@ -150,13 +148,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(4, nodeC, replica); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRestartPrimary() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -170,8 +165,7 @@ public void testRestartPrimary() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); internalCluster().restartNode(primary); ensureGreen(INDEX_NAME); @@ -179,13 +173,10 @@ public void testRestartPrimary() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. final String primary = internalCluster().startNode(); @@ -199,8 +190,7 @@ public void testCancelPrimaryAllocation() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); final IndexShard indexShard = getIndexShard(primary); client().admin() @@ -214,15 +204,13 @@ public void testCancelPrimaryAllocation() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - * + *

* TODO: Ignoring this test as its flaky and needs separate fix */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") @@ -292,7 +280,6 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); @@ -314,10 +301,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -325,12 +309,10 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } } @@ -355,12 +337,8 @@ public void testIndexReopenClose() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(initialDocCount, primary, replica); } - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - logger.info("--> Closing the index "); client().admin().indices().prepareClose(INDEX_NAME).get(); @@ -368,8 +346,8 @@ public void testIndexReopenClose() throws Exception { client().admin().indices().prepareOpen(INDEX_NAME).get(); ensureGreen(INDEX_NAME); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, primary, replica); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } public void testMultipleShards() throws Exception { @@ -400,10 +378,7 @@ public void testMultipleShards() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -411,12 +386,10 @@ public void testMultipleShards() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(3, REPLICA_COUNT); } } @@ -444,24 +417,17 @@ public void testReplicationAfterForceMerge() throws Exception { waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); // Index a second set of docs so we can merge into one segment. indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - - ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } } @@ -556,10 +522,10 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(3, primaryNode, replicaNode); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - assertSegmentStats(REPLICA_COUNT); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } public void testDeleteOperations() throws Exception { @@ -583,20 +549,13 @@ public void testDeleteOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); @@ -605,32 +564,18 @@ public void testDeleteOperations() throws Exception { client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertBusy(() -> { - final long nodeA_Count = client(nodeA).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeA_Count); - final long nodeB_Count = client(nodeB).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeB_Count); - }, 5, TimeUnit.SECONDS); + waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testUpdateOperations() throws Exception { - final String primary = internalCluster().startNode(); + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); try ( @@ -647,20 +592,13 @@ public void testUpdateOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, asList(primary, replica)); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, asList(primary, replica)); Set ids = indexer.getIds(); String id = ids.toArray()[0].toString(); @@ -672,69 +610,80 @@ public void testUpdateOperations() throws Exception { assertEquals(2, updateResponse.getVersion()); refresh(INDEX_NAME); - waitForReplicaUpdate(); + assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT); assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); - } } - private void assertSegmentStats(int numberOfReplicas) throws IOException { - final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); - - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - - // There will be an entry in the list for each index. - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - - // Separate Primary & replica shards ShardSegments. - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - - assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - - assertEquals( - "There should be a ShardSegment entry for each replica in the replicationGroup", - numberOfReplicas, - replicaShardSegments.size() - ); - - for (ShardSegments shardSegment : replicaShardSegments) { - final Map latestReplicaSegments = getLatestSegments(shardSegment); - for (Segment replicaSegment : latestReplicaSegments.values()) { - final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); - assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); - assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); - assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); - assertEquals(replicaSegment.getSize(), primarySegment.getSize()); - } + private void assertIdenticalSegments(int numberOfShards, int numberOfReplicas) throws Exception { + assertBusy(() -> { + final IndicesSegmentResponse indicesSegmentResponse = client().admin() + .indices() + .segments(new IndicesSegmentsRequest()) + .actionGet(); + List segmentsByIndex = getShardSegments(indicesSegmentResponse); - // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. - // This ensures the previous commit point is not wiped. - final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); - ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); - final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - IndexShard indexShard = getIndexShard(replicaNode.getName()); - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - indexShard.store().readLastCommittedSegmentsInfo(); + // There will be an entry in the list for each index. + assertEquals("Expected a different number of shards in the index", numberOfShards, segmentsByIndex.size()); + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegmentsList = segmentListMap.get(false); + assertEquals("There should only be one primary in the replicationGroup", 1, primaryShardSegmentsList.size()); + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegmentsList.size() + ); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final IndexShard primaryShard = getIndexShard(primaryShardSegments); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardSegments replicaShardSegments : replicaShardSegmentsList) { + final IndexShard replicaShard = getIndexShard(replicaShardSegments); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } } - } + }, 1, TimeUnit.MINUTES); + } + + private IndexShard getIndexShard(ShardSegments shardSegments) { + final ShardRouting replicaShardRouting = shardSegments.getShardRouting(); + ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); + return getIndexShard(replicaNode.getName()); } public void testDropPrimaryDuringReplication() throws Exception { + int replica_count = 6; final Settings settings = Settings.builder() .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6); + final List dataNodes = internalCluster().startDataOnlyNodes(6); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -757,50 +706,39 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(); + dataNodes.add(internalCluster().startDataOnlyNode()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. - client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + String docId = String.valueOf(initialDocCount + 1); + client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertSegmentStats(6); + waitForSearchableDocs(initialDocCount + 1, dataNodes); + assertIdenticalSegments(SHARD_COUNT, replica_count); } } /** - * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception if assertion fails + * Waits until all given nodes have at least the expected docCount. + * + * @param docCount - Expected Doc count. + * @param nodes - List of node names. */ - private void waitForReplicaUpdate() throws Exception { + private void waitForSearchableDocs(long docCount, List nodes) throws Exception { // wait until the replica has the latest segment generation. assertBusy(() -> { - final IndicesSegmentResponse indicesSegmentResponse = client().admin() - .indices() - .segments(new IndicesSegmentsRequest()) - .actionGet(); - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - // if we don't have any segments yet, proceed. - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); - for (ShardSegments shardSegments : replicaShardSegments) { - logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); - final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() - .stream() - .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); - assertTrue(isReplicaCaughtUpToPrimary); - } + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + if (response.getHits().getTotalHits().value < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount); } } - }); + }, 1, TimeUnit.MINUTES); + } + + private void waitForSearchableDocs(long docCount, String... nodes) throws Exception { + waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); } private IndexShard getIndexShard(String node) { @@ -820,15 +758,6 @@ private List getShardSegments(IndicesSegmentResponse indicesSeg .collect(Collectors.toList()); } - private Map getLatestSegments(ShardSegments segments) { - final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); - final Long latestPrimaryGen = generation.get(); - return segments.getSegments() - .stream() - .filter(s -> s.getGeneration() == latestPrimaryGen) - .collect(Collectors.toMap(Segment::getName, Function.identity())); - } - private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 96a54ab65d268..f312d23b34af9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1580,6 +1580,19 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { } } + /** + * Fetch a map of StoreFileMetadata for each segment from the latest SegmentInfos. + * This is used to compute diffs for segment replication. + * + * @return - Map of Segment Filename to its {@link StoreFileMetadata} + * @throws IOException - When there is an error loading metadata from the store. + */ + public Map getSegmentMetadataMap() throws IOException { + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return store.getSegmentMetadataMap(snapshot.get()); + } + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index aadcb577f6174..3ecdbdfd7be6e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -22,7 +22,6 @@ import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; @@ -38,8 +37,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; -import java.util.Map; /** * Represents the target of a replication event. @@ -173,8 +170,8 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getMetadataMap() throws IOException { - if (indexShard.getSegmentInfosSnapshot() == null) { - return Collections.emptyMap(); - } - try (final GatedCloseable snapshot = indexShard.getSegmentInfosSnapshot()) { - return store.getSegmentMetadataMap(snapshot.get()); - } - } - @Override protected void onCancel(String reason) { cancellableThreads.cancel(reason); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index ffea4aaf6b7c4..5900ac3ad29f3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -363,8 +363,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { @@ -415,8 +415,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) {