Skip to content

Commit

Permalink
PR Test comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Mar 21, 2024
1 parent 72cefe9 commit 8b20f1a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.nio.file.Path;
import java.util.concurrent.ExecutionException;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand All @@ -42,11 +41,10 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.put("discovery.initial_state_timeout", "500ms")
.build();
} else {
logger.info("Adding docrep node");
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("discovery.initial_state_timeout", "500ms").build();
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryRelocationIT extends MigrationBaseTestCase {
protected int maximumNumberOfShards() {
return 1;
Expand All @@ -53,9 +52,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testMixedModeRelocation() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Client client = internalCluster().client(cmNodes.get(0));
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
Expand All @@ -66,18 +64,7 @@ public void testMixedModeRelocation() throws Exception {

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 100) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
indexingThread.start();
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);

refresh("test");

Expand All @@ -94,14 +81,13 @@ public void testMixedModeRelocation() throws Exception {
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
assertEquals(1, getRepositoriesResponse.repositories().size());

Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
logger.info("--> relocating from {} to {} ", cmNodes.get(0), remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, cmNodes.get(0), remoteNode))
.execute()
.actionGet();
// Index some more docs
int currentDoc = numAutoGenDocs.get();
int finalCurrentDoc1 = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5);

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
Expand All @@ -112,8 +98,13 @@ public void testMixedModeRelocation() throws Exception {
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
assertEquals(remoteNode, primaryNodeName("test"));
logger.info("--> relocation from docrep to remote complete");
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));

// Index some more docs
currentDoc = numAutoGenDocs.get();
int finalCurrentDoc = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5);

client().admin()
.cluster()
Expand All @@ -131,6 +122,8 @@ public void testMixedModeRelocation() throws Exception {
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
assertEquals(remoteNode2, primaryNodeName("test"));

logger.info("--> relocation from remote to remote complete");

finished.set(true);
Expand All @@ -148,9 +141,8 @@ public void testMixedModeRelocation() throws Exception {
}

public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Client client = internalCluster().client(cmNodes.get(0));
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
Expand All @@ -161,18 +153,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 100) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
indexingThread.start();
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);

refresh("test");

Expand All @@ -187,14 +168,9 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
assertEquals(1, getRepositoriesResponse.repositories().size());

setFailRate(REPOSITORY_NAME, 100);
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
logger.info("--> relocating from {} to {} ", cmNodes.get(0), remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, cmNodes.get(0), remoteNode))
.execute()
.actionGet();

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
Expand All @@ -206,6 +182,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {

assertTrue(clusterHealthResponse.getRelocatingShards() == 1);
setFailRate(REPOSITORY_NAME, 0);
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
Expand All @@ -228,4 +205,19 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
numAutoGenDocs.get()
);
}

private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
}
});
indexingThread.start();
return indexingThread;
}
}

0 comments on commit 8b20f1a

Please sign in to comment.