Skip to content

Commit

Permalink
[Remote Store] Waiting for remote store upload in snapshot/local reco…
Browse files Browse the repository at this point in the history
…very (opensearch-project#11720)

* Giving time for snapshot recovery/local time to upload all the data to remote

Signed-off-by: Gaurav Bafna <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
gbbafna authored and shiv0408 committed Apr 25, 2024
1 parent 6e42dd7 commit b1fa232
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.concurrent.ExecutionException;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -130,4 +132,61 @@ public void testCreateCloneIndex() {

}

public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = 1;
prepareCreate("source").setSettings(
Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version)
).get();
final int docs = 2;
for (int i = 0; i < docs; i++) {
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
internalCluster().ensureAtLeastNumDataNodes(2);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

// disable rebalancing to be able to capture the right stats. balancing can move the target primary
// making it hard to pin point the source shards.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
.get();
try {
setFailRate(REPOSITORY_NAME, 100);

client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.CLONE)
.setWaitForActiveShards(0)
.setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build())
.get();

Thread.sleep(2000);
ensureYellow("target");

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
setFailRate(REPOSITORY_NAME, 0);
ensureGreen();
// clean up
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
)
.get();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.bulk.BulkItemResponse;
Expand Down Expand Up @@ -37,7 +39,7 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;

Expand All @@ -60,6 +62,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -146,6 +149,18 @@ protected Settings nodeSettings(int nodeOrdinal) {
}
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
assertAcked(
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
);
}

public Settings indexSettings() {
return defaultIndexSettings();
}
Expand Down Expand Up @@ -224,10 +239,10 @@ public static Settings buildRemoteStoreNodeAttributes(
return buildRemoteStoreNodeAttributes(
segmentRepoName,
segmentRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
translogRepoName,
translogRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
withRateLimiterAttributes
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -479,7 +480,14 @@ public void testRateLimitedRemoteDownloads() throws Exception {
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
settings.put("location", segmentRepoPath).put("max_remote_download_bytes_per_sec", 4, ByteSizeUnit.KB);

assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get());
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(ReloadableFsRepository.TYPE)
.setSettings(settings)
.get()
);

for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME);
Expand Down Expand Up @@ -508,7 +516,14 @@ public void testRateLimitedRemoteDownloads() throws Exception {
// revert repo metadata to pass asserts on repo metadata vs. node attrs during teardown
// https://github.com/opensearch-project/OpenSearch/pull/9569#discussion_r1345668700
settings.remove("max_remote_download_bytes_per_sec");
assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get());
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(ReloadableFsRepository.TYPE)
.setSettings(settings)
.get()
);
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME);
assertNull(segmentRepo.getMetadata().settings().get("max_remote_download_bytes_per_sec"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -53,7 +53,7 @@ public void testRestrictedSettingsCantBeUpdated() {
assertEquals(
e.getMessage(),
"[system-repo-name] trying to modify an unmodifiable attribute type of system "
+ "repository from current value [fs] to new value [mock]"
+ "repository from current value [reloadable-fs] to new value [mock]"
);
}

Expand All @@ -65,7 +65,12 @@ public void testSystemRepositoryNonRestrictedSettingsCanBeUpdated() {
final Settings.Builder repoSettings = Settings.builder().put("location", absolutePath).put("chunk_size", new ByteSizeValue(20));

assertAcked(
client.admin().cluster().preparePutRepository(systemRepoName).setType(FsRepository.TYPE).setSettings(repoSettings).get()
client.admin()
.cluster()
.preparePutRepository(systemRepoName)
.setType(ReloadableFsRepository.TYPE)
.setSettings(repoSettings)
.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
Expand Down
28 changes: 20 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2025,23 +2025,35 @@ public RemoteSegmentStoreDirectory getRemoteDirectory() {
}

/**
Returns true iff it is able to verify that remote segment store
is in sync with local
* Returns true iff it is able to verify that remote segment store
* is in sync with local
*/
boolean isRemoteSegmentStoreInSync() {
assert indexSettings.isRemoteStoreEnabled();
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
if (directory.readLatestMetadataFile() != null) {
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
Collection<String> uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet();
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
Collection<String> localFiles = segmentInfos.files(true);
if (uploadFiles.containsAll(localFiles)) {
return true;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
Collection<String> localSegmentInfosFiles = segmentInfosGatedCloseable.get().files(true);
Set<String> localFiles = new HashSet<>(localSegmentInfosFiles);
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
localFiles.removeAll(RemoteStoreRefreshListener.EXCLUDE_FILES);
if (uploadFiles.containsAll(localFiles)) {
return true;
}
logger.debug(
() -> new ParameterizedMessage(
"RemoteSegmentStoreSyncStatus localSize={} remoteSize={}",
localFiles.size(),
uploadFiles.size()
)
);
}
}
} catch (IOException e) {
} catch (AlreadyClosedException e) {
throw e;
} catch (Throwable e) {
logger.error("Exception while reading latest metadata", e);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,33 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
|| isRefreshAfterCommitSafe()
|| isRemoteSegmentStoreInSync() == false;
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

/**
* Checks if all files present in local store are uploaded to remote store or part of excluded files.
*
* Different from IndexShard#isRemoteSegmentStoreInSync as
* it uses files uploaded cache in RemoteDirector and it doesn't make a remote store call.
* Doesn't throw an exception on store getting closed as store will be open
*
*
* @return true iff all the local files are uploaded to remote store.
*/
boolean isRemoteSegmentStoreInSync() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
return segmentInfosGatedCloseable.get().files(true).stream().allMatch(this::skipUpload);
} catch (Throwable throwable) {
logger.error("Throwable thrown during isRemoteSegmentStoreInSync", throwable);
}
return false;
}

/*
@return false if retry is needed
*/
Expand Down
38 changes: 35 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -191,7 +193,8 @@ void recoverFromLocalShards(
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID());
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
throw new IndexShardRecoveryException(
indexShard.shardId(),
Expand Down Expand Up @@ -432,7 +435,8 @@ void recoverFromSnapshotAndRemoteStore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
Expand Down Expand Up @@ -717,7 +721,8 @@ private void restore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
Expand Down Expand Up @@ -791,4 +796,31 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO
);
store.associateIndexWithNewTranslog(translogUUID);
}

/*
Blocks the calling thread, waiting for the remote store to get synced till internal Remote Upload Timeout
*/
private void waitForRemoteStoreSync(IndexShard indexShard) {
if (indexShard.shardRouting.primary() == false) {
return;
}
long startNanos = System.nanoTime();

while (System.nanoTime() - startNanos < indexShard.getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
try {
if (indexShard.isRemoteSegmentStoreInSync()) {
break;
} else {
try {
Thread.sleep(TimeValue.timeValueMinutes(1).seconds());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
}
} catch (AlreadyClosedException e) {
// There is no point in waiting as shard is now closed .
return;
}
}
}
}
Loading

0 comments on commit b1fa232

Please sign in to comment.