Skip to content

Commit

Permalink
Backport/backport 6280 to 2.x (#6288)
Browse files Browse the repository at this point in the history
* [Segment Replication] Fix flaky testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion test (#6280)

* [Segment Replication] Fix flaky testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion test

Signed-off-by: Suraj Singh <[email protected]>

* Fix spotless failures

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>

* Spotless fix

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Feb 14, 2023
1 parent 4ddae92 commit 522e316
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,15 @@ protected DiscoveryNode getNodeContainingPrimaryShard() {
* @param nodes - List of node names.
*/
protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
// wait until the replica has the latest segment generation.
waitForSearchableDocs(INDEX_NAME, docCount, nodes);
}

public static void waitForSearchableDocs(String indexName, long docCount, List<String> nodes) throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get();
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value;
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.snapshots;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
Expand All @@ -23,15 +22,15 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -80,20 +79,8 @@ protected boolean addMockInternalEngine() {
}

public void ingestData(int docCount, String indexName) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
indexName,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);
refresh(indexName);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
}

Expand Down Expand Up @@ -124,11 +111,8 @@ public void createSnapshot() {
.setWaitForCompletion(true)
.setIndices(INDEX_NAME)
.get();
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), createSnapshotResponse.getSnapshotInfo().totalShards());
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS);
}

public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) {
Expand All @@ -155,7 +139,7 @@ public void testRestoreOnSegRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -166,9 +150,9 @@ public void testRestoreOnSegRep() throws Exception {
assertHitCount(resp, DOC_COUNT);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception {
startClusterWithSettings(segRepEnableIndexSettings(), 1);
List<String> nodes = startClusterWithSettings(segRepEnableIndexSettings(), 1);
waitForSearchableDocs(INDEX_NAME, DOC_COUNT, nodes);
createSnapshot();
// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
Expand All @@ -177,16 +161,23 @@ public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Excepti
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
ingestData(5000, RESTORED_INDEX_NAME);
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS);
final int docCountPostRestore = 1001;
final int totalDocCount = DOC_COUNT + docCountPostRestore;
for (int i = DOC_COUNT; i < totalDocCount; i++) {
client().prepareIndex(RESTORED_INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
flushAndRefresh(RESTORED_INDEX_NAME);
assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS);
waitForSearchableDocs(RESTORED_INDEX_NAME, totalDocCount, nodes);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME))
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT");
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT + 5000);
assertHitCount(resp, totalDocCount);
}

public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception {
Expand All @@ -198,7 +189,7 @@ public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -220,7 +211,7 @@ public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -240,7 +231,7 @@ public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -266,7 +257,7 @@ public void testRestoreOnReplicaNode() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
internalCluster().startNode();
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
Expand Down

0 comments on commit 522e316

Please sign in to comment.