From 522e31650803677da7a2d2c7cb2a805c8eb48a27 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 14 Feb 2023 10:38:47 -0800 Subject: [PATCH] Backport/backport 6280 to 2.x (#6288) * [Segment Replication] Fix flaky testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion test (#6280) * [Segment Replication] Fix flaky testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion test Signed-off-by: Suraj Singh * Fix spotless failures Signed-off-by: Suraj Singh --------- Signed-off-by: Suraj Singh * Spotless fix Signed-off-by: Suraj Singh --------- Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationBaseIT.java | 7 ++- .../SegmentReplicationSnapshotIT.java | 57 ++++++++----------- 2 files changed, 30 insertions(+), 34 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 5f7433126db57..dfffeaf860734 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -113,10 +113,15 @@ protected DiscoveryNode getNodeContainingPrimaryShard() { * @param nodes - List of node names. */ protected void waitForSearchableDocs(long docCount, List nodes) throws Exception { + // wait until the replica has the latest segment generation. + waitForSearchableDocs(INDEX_NAME, docCount, nodes); + } + + public static void waitForSearchableDocs(String indexName, long docCount, List nodes) throws Exception { // wait until the replica has the latest segment generation. assertBusy(() -> { for (String node : nodes) { - final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(); final long hits = response.getHits().getTotalHits().value; if (hits < docCount) { fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java index 4e27e6a5cbd89..0a0f49a8bd3cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -8,7 +8,6 @@ package org.opensearch.snapshots; -import com.carrotsearch.randomizedtesting.RandomizedTest; import org.junit.BeforeClass; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; @@ -23,15 +22,15 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.RestStatus; -import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -80,20 +79,8 @@ protected boolean addMockInternalEngine() { } public void ingestData(int docCount, String indexName) throws Exception { - try ( - BackgroundIndexer indexer = new BackgroundIndexer( - indexName, - "_doc", - client(), - -1, - RandomizedTest.scaledRandomIntBetween(2, 5), - false, - random() - ) - ) { - indexer.start(docCount); - waitForDocs(docCount, indexer); - refresh(indexName); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } } @@ -124,11 +111,8 @@ public void createSnapshot() { .setWaitForCompletion(true) .setIndices(INDEX_NAME) .get(); - assertThat( - createSnapshotResponse.getSnapshotInfo().successfulShards(), - equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) - ); - assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), createSnapshotResponse.getSnapshotInfo().totalShards()); + assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS); } public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) { @@ -155,7 +139,7 @@ public void testRestoreOnSegRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -166,9 +150,9 @@ public void testRestoreOnSegRep() throws Exception { assertHitCount(resp, DOC_COUNT); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception { - startClusterWithSettings(segRepEnableIndexSettings(), 1); + List nodes = startClusterWithSettings(segRepEnableIndexSettings(), 1); + waitForSearchableDocs(INDEX_NAME, DOC_COUNT, nodes); createSnapshot(); // Delete index assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); @@ -177,16 +161,23 @@ public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Excepti RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); - ingestData(5000, RESTORED_INDEX_NAME); - ensureGreen(RESTORED_INDEX_NAME); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); + assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS); + final int docCountPostRestore = 1001; + final int totalDocCount = DOC_COUNT + docCountPostRestore; + for (int i = DOC_COUNT; i < totalDocCount; i++) { + client().prepareIndex(RESTORED_INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + flushAndRefresh(RESTORED_INDEX_NAME); + assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS); + waitForSearchableDocs(RESTORED_INDEX_NAME, totalDocCount, nodes); GetSettingsResponse settingsResponse = client().admin() .indices() .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) .get(); assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); - assertHitCount(resp, DOC_COUNT + 5000); + assertHitCount(resp, totalDocCount); } public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { @@ -198,7 +189,7 @@ public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings()); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -220,7 +211,7 @@ public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -240,7 +231,7 @@ public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -266,7 +257,7 @@ public void testRestoreOnReplicaNode() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); internalCluster().startNode(); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin()