diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 901b36f872622..5be9b25512704 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -9,6 +9,8 @@ package org.opensearch.remotemigration; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.bulk.BulkRequest; @@ -16,11 +18,15 @@ import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; @@ -39,6 +45,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; public class MigrationBaseTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; @@ -114,6 +121,10 @@ public void initDocRepToRemoteMigration() { ); } + public ClusterHealthStatus ensureGreen(String... indices) { + return ensureGreen(TimeValue.timeValueSeconds(60), indices); + } + public BulkResponse indexBulk(String indexName, int numDocs) { BulkRequest bulkRequest = new BulkRequest(); for (int i = 0; i < numDocs; i++) { @@ -181,14 +192,12 @@ private Thread getIndexingThread() { long currentDocCount = indexedDocs.incrementAndGet(); if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { if (rarely()) { - logger.info("--> [iteration {}] flushing index", currentDocCount); client().admin().indices().prepareFlush(indexName).get(); + logger.info("Completed ingestion of {} docs. Flushing now", currentDocCount); } else { - logger.info("--> [iteration {}] refreshing index", currentDocCount); client().admin().indices().prepareRefresh(indexName).get(); } } - logger.info("Completed ingestion of {} docs", currentDocCount); } }); } @@ -218,4 +227,38 @@ public void stopShardRebalancing() { .get() ); } + + public ClusterHealthStatus waitForRelocation() { + ClusterHealthRequest request = Requests.clusterHealthRequest() + .waitForNoRelocatingShards(true) + .timeout(TimeValue.timeValueSeconds(60)) + .waitForEvents(Priority.LANGUID); + ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet(); + if (actionGet.isTimedOut()) { + logger.info( + "waitForRelocation timed out, cluster state:\n{}\n{}", + client().admin().cluster().prepareState().get().getState(), + client().admin().cluster().preparePendingClusterTasks().get() + ); + assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false)); + } + return actionGet.getStatus(); + } + + public ClusterHealthStatus waitForRelocation(TimeValue t) { + ClusterHealthRequest request = Requests.clusterHealthRequest() + .waitForNoRelocatingShards(true) + .timeout(t) + .waitForEvents(Priority.LANGUID); + ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet(); + if (actionGet.isTimedOut()) { + logger.info( + "waitForRelocation timed out, cluster state:\n{}\n{}", + client().admin().cluster().prepareState().get().getState(), + client().admin().cluster().preparePendingClusterTasks().get() + ); + assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false)); + } + return actionGet.getStatus(); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index cea653c0ead4b..fa3b9368ded47 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -99,16 +99,7 @@ public void testRemotePrimaryRelocation() throws Exception { .add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode)) .execute() .actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(60)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .execute() - .actionGet(); - - assertEquals(0, clusterHealthResponse.getRelocatingShards()); + waitForRelocation(); assertEquals(remoteNode, primaryNodeName("test")); logger.info("--> relocation from docrep to remote complete"); @@ -123,16 +114,7 @@ public void testRemotePrimaryRelocation() throws Exception { .add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2)) .execute() .actionGet(); - clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(60)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .execute() - .actionGet(); - - assertEquals(0, clusterHealthResponse.getRelocatingShards()); + waitForRelocation(); assertEquals(remoteNode2, primaryNodeName("test")); logger.info("--> relocation from remote to remote complete"); @@ -155,7 +137,6 @@ public void testRemotePrimaryRelocation() throws Exception { public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { String docRepNode = internalCluster().startNode(); - Client client = internalCluster().client(docRepNode); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java index aae726fe2a6bc..d6e25c0cab3ac 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java @@ -8,15 +8,12 @@ package org.opensearch.remotemigration; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; @@ -83,16 +80,8 @@ public void testReplicaRecovery() throws Exception { .add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode)) .execute() .actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(60)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .execute() - .actionGet(); - assertEquals(0, clusterHealthResponse.getRelocatingShards()); + waitForRelocation(); logger.info("--> relocation of primary from docrep to remote complete"); logger.info("--> getting up the new replicas now to doc rep node as well as remote node "); @@ -109,17 +98,7 @@ public void testReplicaRecovery() throws Exception { ) .get(); - client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(60)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForGreenStatus() - .execute() - .actionGet(); - logger.info("--> replica is up now on another docrep now as well as remote node"); - - assertEquals(0, clusterHealthResponse.getRelocatingShards()); + waitForRelocation(); asyncIndexingService.stopIndexing(); refresh("test"); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java index 4e4f6da56d622..e0e25db4ca722 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java @@ -8,13 +8,11 @@ package org.opensearch.remotemigration; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; @@ -28,6 +26,7 @@ import java.util.List; import java.util.Map; +import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -48,6 +47,10 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); } + protected int maximumNumberOfShards() { + return 5; + } + public void testMixedModeAddRemoteNodes() throws Exception { internalCluster().setBootstrapClusterManagerNodeIndex(0); List cmNodes = internalCluster().startNodes(1); @@ -155,7 +158,11 @@ public void testEndToEndRemoteMigration() throws Exception { internalCluster().setBootstrapClusterManagerNodeIndex(0); List docRepNodes = internalCluster().startNodes(2); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), maximumNumberOfShards()) + ); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); @@ -189,16 +196,7 @@ public void testEndToEndRemoteMigration() throws Exception { ) .get() ); - - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(45)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .execute() - .actionGet(); - assertTrue(clusterHealthResponse.getRelocatingShards() == 0); + waitForRelocation(TimeValue.timeValueSeconds(90)); logger.info("---> Stopping indexing thread"); asyncIndexingService.stopIndexing(); Map shardCountByNodeId = getShardCountByNodeId(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 794535361f626..b530b95208a7e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2155,7 +2155,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException { segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size(); } try { - Thread.sleep(TimeValue.timeValueSeconds(30).seconds()); + Thread.sleep(TimeValue.timeValueSeconds(30).millis()); } catch (InterruptedException ie) { throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie); }