Skip to content

Commit

Permalink
Fix remote migration ITs
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Jun 14, 2024
1 parent 0ea8c2f commit 385f66c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,21 @@ public ClusterHealthStatus waitForRelocation() {
}
return actionGet.getStatus();
}

public ClusterHealthStatus waitForRelocation(TimeValue t) {
ClusterHealthRequest request = Requests.clusterHealthRequest()
.waitForNoRelocatingShards(true)
.timeout(t)
.waitForEvents(Priority.LANGUID);
ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet();
if (actionGet.isTimedOut()) {
logger.info(
"waitForRelocation timed out, cluster state:\n{}\n{}",
client().admin().cluster().prepareState().get().getState(),
client().admin().cluster().preparePendingClusterTasks().get()
);
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
}
return actionGet.getStatus();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -83,16 +80,8 @@ public void testReplicaRecovery() throws Exception {
.add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
logger.info("--> relocation of primary from docrep to remote complete");

logger.info("--> getting up the new replicas now to doc rep node as well as remote node ");
Expand All @@ -109,17 +98,7 @@ public void testReplicaRecovery() throws Exception {
)
.get();

client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.execute()
.actionGet();
logger.info("--> replica is up now on another docrep now as well as remote node");

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
asyncIndexingService.stopIndexing();
refresh("test");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
Expand All @@ -28,6 +26,7 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -48,6 +47,10 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

protected int maximumNumberOfShards() {
return 5;
}

public void testMixedModeAddRemoteNodes() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Expand Down Expand Up @@ -155,7 +158,11 @@ public void testEndToEndRemoteMigration() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> docRepNodes = internalCluster().startNodes(2);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), maximumNumberOfShards())
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");
Expand Down Expand Up @@ -189,16 +196,7 @@ public void testEndToEndRemoteMigration() throws Exception {
)
.get()
);

ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(45))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertTrue(clusterHealthResponse.getRelocatingShards() == 0);
waitForRelocation(TimeValue.timeValueSeconds(90));
logger.info("---> Stopping indexing thread");
asyncIndexingService.stopIndexing();
Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
Expand Down

0 comments on commit 385f66c

Please sign in to comment.