Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Mar 15, 2024
1 parent 22578a9 commit 77c9b3a
Show file tree
Hide file tree
Showing 22 changed files with 413 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -61,7 +60,6 @@ public Settings indexSettings() {
return Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build();
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.recovery:TRACE")
public void testPrimaryRelocationWhileIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.concurrent.ExecutionException;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
Expand Down Expand Up @@ -47,4 +54,16 @@ protected Settings nodeSettings(int nodeOrdinal) {
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
assertAcked(
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,35 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class RemotePrimaryRelocationIT extends MigrationBaseTestCase {
private static final int RELOCATION_COUNT = 1;

protected int maximumNumberOfShards() {
return 1;
}

// ToDo : Fix me when we support migration of replicas
protected int maximumNumberOfReplicas() {
return 0;
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.index.translog.RemoteFsTranslog:TRACE,org.opensearch.index.shard.RemoteStoreRefreshListener:TRACE,"
+ "org.opensearch.indices.recovery:TRACE,"
+ "org.opensearch.index.shard.IndexShard:TRACE")
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

public void testMixedModeRelocation() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Expand Down Expand Up @@ -91,41 +94,45 @@ public void testMixedModeRelocation() throws Exception {
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
assertEquals(1, getRepositoriesResponse.repositories().size());

for (int i = 0; i < RELOCATION_COUNT; i++) {
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
logger.info("--> [iteration {}] relocating from {} to {} ", i, cmNodes.get(0), remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, cmNodes.get(0), remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
logger.info("--> [iteration {}] relocation complete", i);
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
}
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
logger.info("--> relocating from {} to {} ", cmNodes.get(0), remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, cmNodes.get(0), remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
logger.info("--> relocation from docrep to remote complete");
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
logger.info("--> relocation from remote to remote complete");

finished.set(true);
indexingThread.join();
refresh("test");
Expand All @@ -139,4 +146,86 @@ public void testMixedModeRelocation() throws Exception {
);

}

public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Client client = internalCluster().client(cmNodes.get(0));
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 100) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
indexingThread.start();

refresh("test");

// add remote node in mixed mode cluster
addRemote = true;
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

// assert repo gets registered
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { REPOSITORY_NAME });
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
assertEquals(1, getRepositoriesResponse.repositories().size());

setFailRate(REPOSITORY_NAME, 100);
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
logger.info("--> relocating from {} to {} ", cmNodes.get(0), remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, cmNodes.get(0), remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(5))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertTrue(clusterHealthResponse.getRelocatingShards() == 1);
setFailRate(REPOSITORY_NAME, 0);
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(5))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertTrue(clusterHealthResponse.getRelocatingShards() == 0);
logger.info("--> remote to remote relocation complete");
finished.set(true);
indexingThread.join();
refresh("test");
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
.get(),
numAutoGenDocs.get()
);
}
}
Loading

0 comments on commit 77c9b3a

Please sign in to comment.