diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index d6f3fd7ed8f73..19da668c432cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -19,7 +19,6 @@ import java.nio.file.Path; import java.util.concurrent.ExecutionException; -import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -42,11 +41,10 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) - .put("discovery.initial_state_timeout", "500ms") .build(); } else { logger.info("Adding docrep node"); - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("discovery.initial_state_timeout", "500ms").build(); + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index 263ed0dfba26b..b642f3f3f82fd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -29,7 +29,6 @@ import org.opensearch.test.transport.MockTransportService; import java.util.Collection; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -37,7 +36,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemotePrimaryRelocationIT extends MigrationBaseTestCase { protected int maximumNumberOfShards() { return 1; @@ -53,9 +52,8 @@ protected Collection> nodePlugins() { } public void testMixedModeRelocation() throws Exception { - internalCluster().setBootstrapClusterManagerNodeIndex(0); - List cmNodes = internalCluster().startNodes(1); - Client client = internalCluster().client(cmNodes.get(0)); + String docRepNode = internalCluster().startNode(); + Client client = internalCluster().client(docRepNode); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); @@ -66,18 +64,7 @@ public void testMixedModeRelocation() throws Exception { AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = new Thread(() -> { - while (finished.get() == false && numAutoGenDocs.get() < 100) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - logger.info("Indexed {} docs here", numAutoGenDocs.get()); - } - }); - indexingThread.start(); + Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); refresh("test"); @@ -94,14 +81,13 @@ public void testMixedModeRelocation() throws Exception { GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet(); assertEquals(1, getRepositoriesResponse.repositories().size()); - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); - logger.info("--> relocating from {} to {} ", cmNodes.get(0), remoteNode); - client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand("test", 0, cmNodes.get(0), remoteNode)) - .execute() - .actionGet(); + // Index some more docs + int currentDoc = numAutoGenDocs.get(); + int finalCurrentDoc1 = currentDoc; + waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5); + + logger.info("--> relocating from {} to {} ", docRepNode, remoteNode); + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() @@ -112,8 +98,13 @@ public void testMixedModeRelocation() throws Exception { .actionGet(); assertEquals(0, clusterHealthResponse.getRelocatingShards()); + assertEquals(remoteNode, primaryNodeName("test")); logger.info("--> relocation from docrep to remote complete"); - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); + + // Index some more docs + currentDoc = numAutoGenDocs.get(); + int finalCurrentDoc = currentDoc; + waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5); client().admin() .cluster() @@ -131,6 +122,8 @@ public void testMixedModeRelocation() throws Exception { .actionGet(); assertEquals(0, clusterHealthResponse.getRelocatingShards()); + assertEquals(remoteNode2, primaryNodeName("test")); + logger.info("--> relocation from remote to remote complete"); finished.set(true); @@ -148,9 +141,8 @@ public void testMixedModeRelocation() throws Exception { } public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { - internalCluster().setBootstrapClusterManagerNodeIndex(0); - List cmNodes = internalCluster().startNodes(1); - Client client = internalCluster().client(cmNodes.get(0)); + String docRepNode = internalCluster().startNode(); + Client client = internalCluster().client(docRepNode); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); @@ -161,18 +153,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = new Thread(() -> { - while (finished.get() == false && numAutoGenDocs.get() < 100) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - logger.info("Indexed {} docs here", numAutoGenDocs.get()); - } - }); - indexingThread.start(); + Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); refresh("test"); @@ -187,14 +168,9 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { assertEquals(1, getRepositoriesResponse.repositories().size()); setFailRate(REPOSITORY_NAME, 100); - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); - logger.info("--> relocating from {} to {} ", cmNodes.get(0), remoteNode); - client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand("test", 0, cmNodes.get(0), remoteNode)) - .execute() - .actionGet(); + + logger.info("--> relocating from {} to {} ", docRepNode, remoteNode); + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() @@ -206,6 +182,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { assertTrue(clusterHealthResponse.getRelocatingShards() == 1); setFailRate(REPOSITORY_NAME, 0); + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); clusterHealthResponse = client().admin() .cluster() .prepareHealth() @@ -228,4 +205,19 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { numAutoGenDocs.get() ); } + + private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { + Thread indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < 10_000) { + IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex("test").setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + } + }); + indexingThread.start(); + return indexingThread; + } }