From 3917dd52f5c021a4cdf4e04c99bdb89b3db3deee Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 30 Oct 2023 12:39:41 +0530 Subject: [PATCH] Read the same medata file that is locked during restore of shallow snapshot Signed-off-by: Sachin Kale --- .../remotestore/RemoteRestoreSnapshotIT.java | 68 +++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 3 +- .../opensearch/index/shard/StoreRecovery.java | 5 ++ .../store/RemoteSegmentStoreDirectory.java | 6 +- .../RemoteStoreMetadataLockManager.java | 16 +++++ 5 files changed, 94 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index db2e5b340007d..95bc85dc8adbb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -43,6 +43,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -608,4 +609,71 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); } + public void testRestoreShallowSnapshotIndexAfterSnapshot() throws ExecutionException, InterruptedException { + String indexName1 = "testindex1"; + String snapshotRepoName = "test-restore-snapshot-repo"; + String remoteStoreRepoNameUpdated = "test-rs-repo-updated" + TEST_REMOTE_STORE_REPO_SUFFIX; + String snapshotName1 = "test-restore-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + Path absolutePath2 = randomRepoPath().toAbsolutePath(); + String[] pathTokens = absolutePath1.toString().split("/"); + String basePath = pathTokens[pathTokens.length - 1]; + Arrays.copyOf(pathTokens, pathTokens.length - 1); + Path location = PathUtils.get(String.join("/", pathTokens)); + pathTokens = absolutePath2.toString().split("/"); + String basePath2 = pathTokens[pathTokens.length - 1]; + Arrays.copyOf(pathTokens, pathTokens.length - 1); + Path location2 = PathUtils.get(String.join("/", pathTokens)); + logger.info("Path 1 [{}]", absolutePath1); + logger.info("Path 2 [{}]", absolutePath2); + String restoredIndexName1 = indexName1 + "-restored"; + + createRepository(snapshotRepoName, "fs", getRepositorySettings(location, basePath, true)); + + Client client = client(); + Settings indexSettings = Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + createIndex(indexName1, indexSettings); + + int numDocsInIndex1 = randomIntBetween(2, 5); + indexDocuments(client, indexName1, numDocsInIndex1); + + ensureGreen(indexName1); + + logger.info("--> snapshot"); + SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1))); + assertThat(snapshotInfo1.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards())); + assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS)); + + int extraNumDocsInIndex1 = randomIntBetween(20, 50); + indexDocuments(client, indexName1, extraNumDocsInIndex1); + refresh(indexName1); + + client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get(); + createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath); + RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .setSourceRemoteStoreRepository(remoteStoreRepoNameUpdated) + .get(); + + assertTrue(restoreSnapshotResponse2.getRestoreInfo().failedShards() == 0); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1); + + // indexing some new docs and validating + indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2); + ensureGreen(restoredIndexName1); + assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1 + 2); + } + } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 565ce2e717a14..7326e810eedf2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4893,8 +4893,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore( remoteStore.incRef(); } Map uploadedSegments = sourceRemoteDirectory - .initializeToSpecificCommit(primaryTerm, commitGeneration) - .getMetadata(); + .getSegmentsUploadedToRemoteStore(); final Directory storeDirectory = store.directory(); store.incRef(); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index c0211e1257c8e..add30f8d3916f 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -401,6 +401,11 @@ void recoverFromSnapshotAndRemoteStore( indexUUID, shardId ); + sourceRemoteDirectory.initializeToSpecificCommit( + primaryTerm, + commitGeneration, + recoverySource.snapshot().getSnapshotId().getUUID() + ); indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration); final Store store = indexShard.store(); if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index be1f2341236ab..988d52202f975 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -34,6 +34,7 @@ import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -160,8 +161,9 @@ public RemoteSegmentMetadata init() throws IOException { * * @throws IOException if there were any failures in reading the metadata file */ - public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration) throws IOException { - String metadataFile = getMetadataFileForCommit(primaryTerm, commitGeneration); + public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration, String acquirerId) throws IOException { + String metadataFilePrefix = MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, commitGeneration); + String metadataFile = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLock(metadataFilePrefix, acquirerId); RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile); if (remoteSegmentMetadata != null) { this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata()); diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index fd7906729e314..756905d02229a 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -14,10 +14,13 @@ import org.apache.lucene.store.IndexOutput; import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Collection; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to @@ -70,6 +73,19 @@ public void release(LockInfo lockInfo) throws IOException { } } + public String fetchLock(String filenamePrefix, String acquirerId) throws IOException { + Collection lockFiles = lockDirectory.listFilesByPrefix(filenamePrefix); + List lockFilesForAcquirer = lockFiles.stream() + .filter(lockFile -> acquirerId.equals(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lockFile))) + .map(FileLockInfo.LockFileUtils::getFileToLockNameFromLock) + .collect(Collectors.toList()); + if (lockFilesForAcquirer.size() == 0) { + throw new FileNotFoundException("No lock file found for prefix: " + filenamePrefix + " and acquirerId: " + acquirerId); + } + assert lockFilesForAcquirer.size() == 1; + return lockFilesForAcquirer.get(0); + } + /** * Checks whether a given file have any lock on it or not. * @param lockInfo File Lock Info instance for which we need to check if lock is acquired.