Skip to content

Commit

Permalink
Read the same medata file that is locked during restore of shallow sn…
Browse files Browse the repository at this point in the history
…apshot

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Oct 30, 2023
1 parent 767bc0e commit 3917dd5
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -608,4 +609,71 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionException, InterruptedException {
String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX;
String snapshotName1 = "test-restore-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
Path absolutePath2 = randomRepoPath().toAbsolutePath();
String[] pathTokens = absolutePath1.toString().split("/");
String basePath = pathTokens[pathTokens.length - 1];
Arrays.copyOf(pathTokens, pathTokens.length - 1);
Path location = PathUtils.get(String.join("/", pathTokens));
pathTokens = absolutePath2.toString().split("/");
String basePath2 = pathTokens[pathTokens.length - 1];
Arrays.copyOf(pathTokens, pathTokens.length - 1);
Path location2 = PathUtils.get(String.join("/", pathTokens));
logger.info("Path 1 [{}]", absolutePath1);
logger.info("Path 2 [{}]", absolutePath2);
String restoredIndexName1 = indexName1 + "-restored";

createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true));

Client client = client();
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
createIndex(indexName1, indexSettings);

int numDocsInIndex1 = randomIntBetween(2, 5);
indexDocuments(client, indexName1, numDocsInIndex1);

ensureGreen(indexName1);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

int extraNumDocsInIndex1 = randomIntBetween(20, 50);
indexDocuments(client, indexName1, extraNumDocsInIndex1);
refresh(indexName1);

client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get();
createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated)
.get();

assertTrue(restoreSnapshotResponse2.getRestoreInfo().failedShards() == 0);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);

// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4893,8 +4893,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
remoteStore.incRef();
}
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.initializeToSpecificCommit(primaryTerm, commitGeneration)
.getMetadata();
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ void recoverFromSnapshotAndRemoteStore(
indexUUID,
shardId
);
sourceRemoteDirectory.initializeToSpecificCommit(

Check warning on line 404 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L404

Added line #L404 was not covered by tests
primaryTerm,
commitGeneration,
recoverySource.snapshot().getSnapshotId().getUUID()

Check warning on line 407 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L407

Added line #L407 was not covered by tests
);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
final Store store = indexShard.store();
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -160,8 +161,9 @@ public RemoteSegmentMetadata init() throws IOException {
*
* @throws IOException if there were any failures in reading the metadata file
*/
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration) throws IOException {
String metadataFile = getMetadataFileForCommit(primaryTerm, commitGeneration);
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration, String acquirerId) throws IOException {
String metadataFilePrefix = MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, commitGeneration);
String metadataFile = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLock(metadataFilePrefix, acquirerId);

Check warning on line 166 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L165-L166

Added lines #L165 - L166 were not covered by tests
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to
Expand Down Expand Up @@ -70,6 +73,19 @@ public void release(LockInfo lockInfo) throws IOException {
}
}

public String fetchLock(String filenamePrefix, String acquirerId) throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(filenamePrefix);
List<String> lockFilesForAcquirer = lockFiles.stream()
.filter(lockFile -> acquirerId.equals(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lockFile)))
.map(FileLockInfo.LockFileUtils::getFileToLockNameFromLock)
.collect(Collectors.toList());

Check warning on line 81 in server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java#L77-L81

Added lines #L77 - L81 were not covered by tests
if (lockFilesForAcquirer.size() == 0) {
throw new FileNotFoundException("No lock file found for prefix: " + filenamePrefix + " and acquirerId: " + acquirerId);

Check warning on line 83 in server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java#L83

Added line #L83 was not covered by tests
}
assert lockFilesForAcquirer.size() == 1;
return lockFilesForAcquirer.get(0);

Check warning on line 86 in server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java#L86

Added line #L86 was not covered by tests
}

/**
* Checks whether a given file have any lock on it or not.
* @param lockInfo File Lock Info instance for which we need to check if lock is acquired.
Expand Down

0 comments on commit 3917dd5

Please sign in to comment.