Skip to content

Commit

Permalink
IT for deletion in case of remote migration relocation failure
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbafna committed Jun 18, 2024
1 parent 5c89fc0 commit 0ec6267
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotemigration;

import org.junit.Before;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand All @@ -27,25 +28,35 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
Expand Down Expand Up @@ -96,18 +107,61 @@ protected Settings nodeSettings(int nodeOrdinal) {
}
}

protected Collection<Class<? extends Plugin>> nodePlugins() {
/* Adding the following mock plugins:
- InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync
- MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated
*/
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class, MockRepository.Plugin.class)
).collect(Collectors.toList());
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
.put("random_data_file_io_exception_rate", value);
assertAcked(
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
client().admin().cluster().preparePutRepository(repoName).setType("mock").setSettings(settings).get()
);
}

protected void setRegExToFail(String repoName, String value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put("regexes_to_fail_io", value);
assertAcked(
client().admin().cluster().preparePutRepository(repoName).setType("mock").setSettings(settings).get()
);
}

public static long getOldestFileCreationTime(Path path) throws Exception {
final AtomicLong oldestFileCreationTime = new AtomicLong(Long.MAX_VALUE);
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException impossible) throws IOException {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.getLastModifiedTime(file).toMillis() < oldestFileCreationTime.get()) {
oldestFileCreationTime.set(Files.getLastModifiedTime(file).toMillis());
}
return FileVisitResult.CONTINUE;
}
});

return oldestFileCreationTime.get();
}

public void initDocRepToRemoteMigration() {
assertTrue(
internalCluster().client()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,11 @@
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand All @@ -45,18 +38,6 @@ public class RemoteDualReplicationIT extends MigrationBaseTestCase {
private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep";
private final String FAILOVER_REMOTE_TO_REMOTE = "failover-remote-to-remote";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
/* Adding the following mock plugins:
- InternalSettingsPlugin : To override default intervals of retention lease and global ckp sync
- MockFsRepositoryPlugin and MockTransportService.TestPlugin: To ensure remote interactions are not no-op and retention leases are properly propagated
*/
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(InternalSettingsPlugin.class, MockFsRepositoryPlugin.class, MockTransportService.TestPlugin.class)
).collect(Collectors.toList());
}

/*
Scenario:
- Starts 2 docrep backed node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.junit.annotations.TestIssueLogging;

import java.util.Collection;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -51,10 +50,6 @@ protected int maximumNumberOfReplicas() {
return 0;
}

protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

public void testRemotePrimaryRelocation() throws Exception {
List<String> docRepNodes = internalCluster().startNodes(2);
Client client = internalCluster().client(docRepNodes.get(0));
Expand Down Expand Up @@ -178,7 +173,77 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

setFailRate(REPOSITORY_NAME, 100);
setFailRate(REPOSITORY_NAME, 1);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), "10s"))
.get();

// Change direction to remote store
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, docRepNode, remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(5))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertTrue(clusterHealthResponse.getRelocatingShards() == 1);
// waiting more than waitForRemoteStoreSync's sleep time of 30 sec to deterministically fail
Thread.sleep(40000);

ClusterHealthRequest healthRequest = Requests.clusterHealthRequest()
.waitForNoRelocatingShards(true)
.waitForNoInitializingShards(true);
ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet();
assertEquals(actionGet.getRelocatingShards(), 0);
assertEquals(docRepNode, primaryNodeName(INDEX_NAME));

asyncIndexingService.stopIndexing();
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
.get();
}

/*
Asserts in case of failed relocation, new remote node cleans up files uploaded by previous node.
*/
@TestIssueLogging(value = "_root:DEBUG,org.opensearch.index.store.RemoteSegmentStoreDirectory:TRACE", issueUrl = "https://github.com/elastic/elasticsearch/issues/41068")
public void testMixedModeRelocation_RemoteSeeding_CleanUp() throws Exception {
String docRepNode = internalCluster().startNode();
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen(INDEX_NAME);

AsyncIndexingService asyncIndexingService = new AsyncIndexingService(INDEX_NAME);
asyncIndexingService.startIndexing();

refresh(INDEX_NAME);

// add remote node in mixed mode cluster
setAddRemote(true);
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

setRegExToFail(REPOSITORY_NAME, "segments_");
client().admin()
.cluster()
.prepareUpdateSettings()
Expand Down Expand Up @@ -216,6 +281,28 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
assertEquals(actionGet.getRelocatingShards(), 0);
assertEquals(docRepNode, primaryNodeName(INDEX_NAME));


internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNode));
setRegExToFail(REPOSITORY_NAME, null);
long before = System.currentTimeMillis();
remoteNode = internalCluster().startNode();
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, docRepNode, remoteNode))
.execute()
.actionGet();
waitForRelocation();
assertEquals(remoteNode, primaryNodeName(INDEX_NAME));
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
//assert files got recreated as new node should delete files created by previous node .
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID);
assertTrue(getOldestFileCreationTime(indexPath) > before);

asyncIndexingService.stopIndexing();
client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;

Expand Down Expand Up @@ -143,7 +144,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsMetadataSupportedRepositoryPlugin.class))
.collect(Collectors.toList());
} else {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class, MockRepository.Plugin.class)).collect(Collectors.toList());
}
}
return super.nodePlugins();
Expand All @@ -159,7 +160,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
} else {
if (asyncUploadMockFsRepo) {
String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : MockFsRepositoryPlugin.TYPE;
String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : "mock";
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size();
}
try {
Thread.sleep(TimeValue.timeValueSeconds(30).millis());
Thread.sleep(TimeValue.timeValueSeconds(1).millis());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class MockRepository extends FsRepository {
Expand Down Expand Up @@ -125,6 +126,8 @@ public long getFailureCount() {

private final List<String> skipExceptionOnBlobs;

private final List<String> regexesToFailIO;

private final boolean useLuceneCorruptionException;

private final long maximumNumberOfFailures;
Expand Down Expand Up @@ -188,6 +191,7 @@ public MockRepository(
skipExceptionOnVerificationFile = metadata.settings().getAsBoolean("skip_exception_on_verification_file", false);
skipExceptionOnListBlobs = metadata.settings().getAsBoolean("skip_exception_on_list_blobs", false);
skipExceptionOnBlobs = metadata.settings().getAsList("skip_exception_on_blobs");
regexesToFailIO = metadata.settings().getAsList("regexes_to_fail_io");
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);
maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false);
Expand Down Expand Up @@ -401,6 +405,14 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
// Condition 4 - This condition allows to skip exception on specific blobName or blobPrefix
return;
}

if (failIOForBlobsMatchingRegex(blobName) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
throw new IOException("Random IOException");
} else {
logger.info("writing the fule {}", blobName);
}

if (blobName.startsWith("__")) {
if (shouldFail(blobName, randomDataFileIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
Expand Down Expand Up @@ -611,5 +623,9 @@ private boolean skipExceptionOnListBlobs(String blobName) {
private boolean skipExceptionOnBlob(String blobName) {
return skipExceptionOnBlobs.contains(blobName);
}

private boolean failIOForBlobsMatchingRegex(String blobName) {
return regexesToFailIO.stream().anyMatch(regex -> Pattern.compile(regex).matcher(blobName).find());
}
}
}
Loading

0 comments on commit 0ec6267

Please sign in to comment.