diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index bc63c36ae838f..02da12904a6e7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -16,6 +16,8 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -55,15 +57,17 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends OpenSearchIntegTestCase { - private static final String INDEX_NAME = "test-idx-1"; - private static final int SHARD_COUNT = 1; - private static final int REPLICA_COUNT = 1; + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; @Override protected Collection> nodePlugins() { @@ -91,6 +95,26 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); } + public void ingestDocs(int docCount) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + } + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); @@ -266,6 +290,7 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(featureFlagSettings()); final String nodeB = internalCluster().startNode(featureFlagSettings()); @@ -497,6 +522,7 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); @@ -598,6 +624,61 @@ public void testDeleteOperations() throws Exception { } } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testUpdateOperations() throws Exception { + final String primary = internalCluster().startNode(featureFlagSettings()); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + final String replica = internalCluster().startNode(featureFlagSettings()); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForReplicaUpdate(); + + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + Set ids = indexer.getIds(); + String id = ids.toArray()[0].toString(); + UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id) + .setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh()); + assertEquals(2, updateResponse.getVersion()); + + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + + } + } + private void assertSegmentStats(int numberOfReplicas) throws IOException { final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); @@ -689,7 +770,7 @@ public void testDropPrimaryDuringReplication() throws Exception { /** * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception + * @throws Exception if assertion fails */ private void waitForReplicaUpdate() throws Exception { // wait until the replica has the latest segment generation. @@ -706,7 +787,7 @@ private void waitForReplicaUpdate() throws Exception { // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false) { + if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) {