diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 01c0a12d463ea..f1cd02282d7e0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -163,6 +163,8 @@ import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.StoreStats; +import org.opensearch.index.store.lockmanager.FileLockInfo; +import org.opensearch.index.store.lockmanager.LockInfo; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.RemoteFsTranslog; @@ -1497,7 +1499,9 @@ public GatedCloseable acquireLastIndexCommitAndRefresh(boolean flus */ public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory(); - remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId); + String lockIdentifier = remoteSegmentStoreDirectory.getLockIdentifier(primaryTerm, generation); + LockInfo lockInfo = new FileLockInfo(lockIdentifier, snapshotId); + remoteSegmentStoreDirectory.acquireLock(lockInfo); } /** @@ -1509,7 +1513,9 @@ public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long ge */ public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory(); - remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId); + String lockIdentifier = remoteSegmentStoreDirectory.getLockIdentifier(primaryTerm, generation); + LockInfo lockInfo = new FileLockInfo(lockIdentifier, snapshotId); + remoteSegmentStoreDirectory.releaseLock(lockInfo); } public Optional getReplicationEngine() { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index ac129aca8baf7..df107ccd268a5 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -23,7 +23,7 @@ import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.store.lockmanager.FileLockInfo; -import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; +import org.opensearch.index.store.lockmanager.LockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; @@ -55,7 +55,7 @@ * another instance of {@code RemoteDirectory}. * @opensearch.internal */ -public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreCommitLevelLockManager { +public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreLockManager { /** * Each segment file is uploaded with unique suffix. * For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG @@ -74,7 +74,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ private final RemoteDirectory remoteMetadataDirectory; - private final RemoteStoreLockManager mdLockManager; + private final RemoteDirectory remoteLockDirectory; private final ThreadPool threadPool; @@ -108,13 +108,13 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, - RemoteStoreLockManager mdLockManager, + RemoteDirectory remoteLockDirectory, ThreadPool threadPool ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; - this.mdLockManager = mdLockManager; + this.remoteLockDirectory = remoteLockDirectory; this.threadPool = threadPool; init(); } @@ -370,45 +370,76 @@ public IndexInput openInput(String name, IOContext context) throws IOException { /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} - * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation - * @param acquirerId Lock Acquirer ID which wants to acquire lock on the commit. + * @param lockInfo lock identifier and acquirer ID info * @throws IOException will be thrown in case i) listing file failed or ii) Writing the lock file failed. * @throws NoSuchFileException when metadata file is not present for given commit point. */ @Override - public void acquireLock(long primaryTerm, long generation, String acquirerId) throws IOException { - String metadataFile = getMetadataFileForCommit(primaryTerm, generation); - - mdLockManager.acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build()); + public void acquireLock(LockInfo lockInfo) throws IOException { + assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; + String filename = ((FileLockInfo) lockInfo).getFileToLock(); + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(filename); + if (metadataFiles.isEmpty()) { + throw new NoSuchFileException("Metadata file " + filename + " does not exist to acquire lock"); + } + IndexOutput indexOutput = remoteLockDirectory.createOutput(lockInfo.generateLockName(), IOContext.DEFAULT); + indexOutput.close(); } /** * Releases a lock which was acquired on given segment commit. - * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation - * @param acquirerId Acquirer ID for which lock needs to be released. + * @param lockInfo lock identifier and acquirer ID info * @throws IOException will be thrown in case i) listing lock files failed or ii) deleting the lock file failed. - * @throws NoSuchFileException when metadata file is not present for given commit point. */ @Override - public void releaseLock(long primaryTerm, long generation, String acquirerId) throws IOException { - String metadataFile = getMetadataFileForCommit(primaryTerm, generation); - mdLockManager.release(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build()); + public void releaseLock(LockInfo lockInfo) throws IOException { + assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; + Collection lockFiles = remoteLockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getFileToLock()); + + if (lockFiles.isEmpty()) { + logger.warn("Lock file {} does not exist", ((FileLockInfo) lockInfo).getFileToLock()); + return; + } + // ideally there should be only one lock per acquirer, but just to handle any stale locks, + // we try to release all the locks for the acquirer. + String acquirerId = ((FileLockInfo) lockInfo).getAcquirerId(); + List locksToRelease = lockFiles.stream() + .filter(lockFile -> FileLockInfo.getAcquirerIdFromLock(lockFile).equals(acquirerId)) + .collect(Collectors.toList()); + if (locksToRelease.size() > 1) { + logger.warn(locksToRelease.size() + " locks found for acquirer " + ((FileLockInfo) lockInfo).getAcquirerId()); + } + for (String lock : locksToRelease) { + remoteLockDirectory.deleteFile(lock); + } } /** * Checks if a specific commit have any corresponding lock file. - * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation + * @param lockInfo lock identifier and acquirer ID info * @return True if there is at least one lock for given primary term and generation. * @throws IOException will be thrown in case listing lock files failed. * @throws NoSuchFileException when metadata file is not present for given commit point. */ @Override - public Boolean isLockAcquired(long primaryTerm, long generation) throws IOException { - String metadataFile = getMetadataFileForCommit(primaryTerm, generation); - return mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build()); + public Boolean isLockAcquired(LockInfo lockInfo) throws IOException { + assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; + Collection lockFiles = remoteLockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getFileToLock()); + String acquirerId = ((FileLockInfo) lockInfo).getAcquirerId(); + List locksByAcquirer = lockFiles.stream() + .filter(lockFile -> FileLockInfo.getAcquirerIdFromLock(lockFile).equals(acquirerId)) + .collect(Collectors.toList()); + return !locksByAcquirer.isEmpty(); + } + + @Override + public Boolean isLockAcquired(String lockIdentifier) throws IOException { + Collection lockFiles = remoteLockDirectory.listFilesByPrefix(lockIdentifier); + return lockFiles.isEmpty() == false; + } + + public String getLockIdentifier(long primaryTerm, long generation) throws IOException { + return getMetadataFileForCommit(primaryTerm, generation); } // Visible for testing @@ -610,8 +641,10 @@ private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOExceptio return true; } return !isLockAcquired( - MetadataFilenameUtils.getPrimaryTerm(metadataFile.split(MetadataFilenameUtils.SEPARATOR)), - MetadataFilenameUtils.getGeneration(metadataFile.split(MetadataFilenameUtils.SEPARATOR)) + getLockIdentifier( + MetadataFilenameUtils.getPrimaryTerm(metadataFile.split(MetadataFilenameUtils.SEPARATOR)), + MetadataFilenameUtils.getGeneration(metadataFile.split(MetadataFilenameUtils.SEPARATOR)) + ) ); } catch (IOException e) { logger.error( @@ -708,7 +741,7 @@ private boolean deleteIfEmpty() throws IOException { try { remoteDataDirectory.delete(); remoteMetadataDirectory.delete(); - mdLockManager.delete(); + remoteLockDirectory.delete(); } catch (Exception e) { logger.error("Exception occurred while deleting directory", e); return false; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 3bec84f287ce4..51793b5a5dfa4 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -13,8 +13,6 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; -import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -59,14 +57,13 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data"); RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata"); - RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( - repositoriesService.get(), - repositoryName, - indexUUID, - shardId + BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS); + RemoteBufferedOutputDirectory remoteLockDirectory = createRemoteBufferedOutputDirectory( + repository, + shardLevelBlobPath, + "lock_files" ); - - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteLockDirectory, threadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } @@ -77,4 +74,14 @@ private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath co BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); return new RemoteDirectory(dataBlobContainer); } + + private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory( + Repository repository, + BlobPath commonBlobPath, + String extention + ) { + BlobPath extendedPath = commonBlobPath.add(extention); + BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); + return new RemoteBufferedOutputDirectory(dataBlobContainer); + } } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java index a8fb7bf20c393..e566090f5b7b5 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/FileLockInfo.java @@ -8,9 +8,7 @@ package org.opensearch.index.store.lockmanager; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; +import org.opensearch.core.common.Strings; /** * A Class that defines Info about Remote Store File Lock. @@ -18,8 +16,16 @@ * @opensearch.internal */ public class FileLockInfo implements LockInfo { - private String fileToLock; - private String acquirerId; + private final String fileToLock; + private final String acquirerId; + + public FileLockInfo(String fileToLock, String acquirerId) { + if (Strings.isNullOrEmpty(fileToLock) || Strings.isNullOrEmpty(acquirerId)) { + throw new IllegalArgumentException("Both the arguments should be non-empty"); + } + this.fileToLock = fileToLock; + this.acquirerId = acquirerId; + } public String getAcquirerId() { return acquirerId; @@ -29,101 +35,17 @@ public String getFileToLock() { return fileToLock; } - private void setFileToLock(String fileName) { - this.fileToLock = fileName; - } - - private void setAcquirerId(String acquirerId) { - this.acquirerId = acquirerId; - } - @Override public String generateLockName() { - validateRequiredParameters(this); - return LockFileUtils.generateLockName(fileToLock, acquirerId); - } - - String getLockPrefix() { - if (fileToLock == null || fileToLock.isBlank()) { - throw new IllegalArgumentException("File to Lock should be provided"); - } - return fileToLock + RemoteStoreLockManagerUtils.SEPARATOR; - } - - List getLocksForAcquirer(String[] lockFiles) { - if (acquirerId == null || acquirerId.isBlank()) { - throw new IllegalArgumentException("Acquirer ID should be provided"); - } - return Arrays.stream(lockFiles) - .filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile))) - .collect(Collectors.toList()); - } - - public static LockInfoBuilder getLockInfoBuilder() { - return new LockInfoBuilder(); - } - - private static void validateRequiredParameters(FileLockInfo fileLockInfo) { - if (fileLockInfo.getAcquirerId() == null || fileLockInfo.getAcquirerId().isBlank()) { - throw new IllegalArgumentException("Acquirer ID should be provided"); - } - if (fileLockInfo.getFileToLock() == null || fileLockInfo.getFileToLock().isBlank()) { - throw new IllegalArgumentException("File to Lock should be provided"); - } - } - - static class LockFileUtils { - static String generateLockName(String fileToLock, String acquirerId) { - return String.join(RemoteStoreLockManagerUtils.SEPARATOR, fileToLock, acquirerId) - + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION; - } - - public static String getFileToLockNameFromLock(String lockName) { - String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR); - - if (lockNameTokens.length != 2) { - throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid."); - } - return lockNameTokens[0]; - } - - public static String getAcquirerIdFromLock(String lockName) { - String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR); - - if (lockNameTokens.length != 2) { - throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid."); - } - return lockNameTokens[1].replace(RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, ""); - } + return String.join(RemoteStoreLockManagerUtils.SEPARATOR, fileToLock, acquirerId) + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION; } - /** - * A Builder Class to build an Instance of {@code FileLockInfo} - * @opensearch.internal - */ - public static class LockInfoBuilder implements LockInfo.LockInfoBuilder { - private final FileLockInfo lockFileInfo; - - LockInfoBuilder() { - this.lockFileInfo = new FileLockInfo(); - } - - public LockInfoBuilder withFileToLock(String fileToLock) { - lockFileInfo.setFileToLock(fileToLock); - return this; - } - - public LockInfoBuilder withAcquirerId(String acquirerId) { - lockFileInfo.setAcquirerId(acquirerId); - return this; - } + public static String getAcquirerIdFromLock(String lockName) { + String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR); - @Override - public FileLockInfo build() { - if (lockFileInfo.fileToLock == null && lockFileInfo.acquirerId == null) { - throw new IllegalStateException("Either File to Lock or AcquirerId should be provided to instantiate FileLockInfo"); - } - return lockFileInfo; + if (lockNameTokens.length != 2) { + throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid."); } + return lockNameTokens[1].replace(RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, ""); } } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreCommitLevelLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreCommitLevelLockManager.java deleted file mode 100644 index 20b6ded9f2401..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreCommitLevelLockManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.lockmanager; - -import java.io.IOException; - -/** - * An Interface that defines Commit Level Lock in Remote Store. We can lock the segment files corresponding to a given - * primaryTerm and Commit Generation. - * - * @opensearch.internal - */ -public interface RemoteStoreCommitLevelLockManager { - /** - * - * This method will be used to acquire lock on segment files of a specific commit. - * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation - * @param acquirerId Resource ID which wants to acquire lock on the commit. - * @throws IOException in case there is a problem in acquiring lock on a commit. - */ - void acquireLock(long primaryTerm, long generation, String acquirerId) throws IOException; - - /** - * This method will be used to release lock on segment files of a specific commit, which got acquired by given - * resource. - * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation - * @param acquirerId Resource ID for which lock needs to be released. - * @throws IOException in case there is a problem in releasing lock on a commit. - */ - void releaseLock(long primaryTerm, long generation, String acquirerId) throws IOException; - - /** - * This method will be used to check if a specific commit have any lock acquired on it or not. - * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation - * @return true if given commit is locked, else false. - * @throws IOException in case there is a problem in checking if a commit is locked or not. - */ - Boolean isLockAcquired(long primaryTerm, long generation) throws IOException; -} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java index c30be082b4795..4f8f10464a092 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -18,29 +18,32 @@ */ public interface RemoteStoreLockManager { /** - * + * Acquire lock on given identifier and acquirer ID * @param lockInfo lock info instance for which we need to acquire lock. * @throws IOException throws exception in case there is a problem with acquiring lock. */ - void acquire(LockInfo lockInfo) throws IOException; + void acquireLock(LockInfo lockInfo) throws IOException; /** - * + * Release lock on given identifier and acquirer ID * @param lockInfo lock info instance for which lock need to be removed. * @throws IOException throws exception in case there is a problem in releasing lock. */ - void release(LockInfo lockInfo) throws IOException; + void releaseLock(LockInfo lockInfo) throws IOException; /** - * + * Checks if a lock is acquired on given identifier and acquirer ID * @param lockInfo lock info instance for which we need to check if lock is acquired. * @return whether a lock is acquired on the given lock info. * @throws IOException throws exception in case there is a problem in checking if a given file is locked or not. */ - Boolean isAcquired(LockInfo lockInfo) throws IOException; + Boolean isLockAcquired(LockInfo lockInfo) throws IOException; - /* - Deletes all lock related files and directories + /** + * Checks if any lock is acquired on given identifier + * @param lockIdentifier lock identifier + * @return whether a lock is acquired on the given lock info. + * @throws IOException throws exception in case there is a problem in checking if a given file is locked or not. */ - void delete() throws IOException; + Boolean isLockAcquired(String lockIdentifier) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java deleted file mode 100644 index e866551eae143..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.lockmanager; - -import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.index.store.RemoteBufferedOutputDirectory; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.Repository; -import org.opensearch.repositories.RepositoryMissingException; -import org.opensearch.repositories.blobstore.BlobStoreRepository; - -import java.io.IOException; -import java.util.function.Supplier; - -/** - * Factory for remote store lock manager - * - * @opensearch.internal - */ -public class RemoteStoreLockManagerFactory { - private static final String SEGMENTS = "segments"; - private static final String LOCK_FILES = "lock_files"; - private final Supplier repositoriesService; - - public RemoteStoreLockManagerFactory(Supplier repositoriesService) { - this.repositoriesService = repositoriesService; - } - - public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException { - return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId); - } - - public static RemoteStoreMetadataLockManager newLockManager( - RepositoriesService repositoriesService, - String repositoryName, - String indexUUID, - String shardId - ) throws IOException { - try (Repository repository = repositoriesService.repository(repositoryName)) { - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS); - RemoteBufferedOutputDirectory shardMDLockDirectory = createRemoteBufferedOutputDirectory( - repository, - shardLevelBlobPath, - LOCK_FILES - ); - - return new RemoteStoreMetadataLockManager(shardMDLockDirectory); - } catch (RepositoryMissingException e) { - throw new IllegalArgumentException("Repository should be present to acquire/release lock", e); - } - } - - private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory( - Repository repository, - BlobPath commonBlobPath, - String extention - ) { - BlobPath extendedPath = commonBlobPath.add(extention); - BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); - return new RemoteBufferedOutputDirectory(dataBlobContainer); - } -} diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerUtils.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerUtils.java index 452dfc329d88b..2941fe0b41393 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerUtils.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerUtils.java @@ -14,10 +14,6 @@ * @opensearch.internal */ public class RemoteStoreLockManagerUtils { - static final String FILE_TO_LOCK_NAME = "file_to_lock"; static final String SEPARATOR = "___"; static final String LOCK_FILE_EXTENSION = ".lock"; - static final String ACQUIRER_ID = "acquirer_id"; - public static final String NO_TTL = "-1"; - static final String LOCK_EXPIRY_TIME = "lock_expiry_time"; } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java deleted file mode 100644 index 7df20cae10664..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.lockmanager; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; -import org.opensearch.index.store.RemoteBufferedOutputDirectory; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; - -/** - * A Class that implements Remote Store Lock Manager by creating lock files for the remote store files that needs to - * be locked. - * It uses {@code LockFileInfo} instance to get the information about the lock file on which operations need to - * be executed. - * - * @opensearch.internal - */ -public class RemoteStoreMetadataLockManager implements RemoteStoreLockManager { - private static final Logger logger = LogManager.getLogger(RemoteStoreMetadataLockManager.class); - private final RemoteBufferedOutputDirectory lockDirectory; - - public RemoteStoreMetadataLockManager(RemoteBufferedOutputDirectory lockDirectory) { - this.lockDirectory = lockDirectory; - } - - /** - * Acquires lock on the file mentioned in LockInfo Instance. - * @param lockInfo File Lock Info instance for which we need to acquire lock. - * @throws IOException in case there is some failure while acquiring lock. - */ - @Override - public void acquire(LockInfo lockInfo) throws IOException { - assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; - IndexOutput indexOutput = lockDirectory.createOutput(lockInfo.generateLockName(), IOContext.DEFAULT); - indexOutput.close(); - } - - /** - * Releases Locks acquired by a given acquirer which is passed in LockInfo Instance. - * Right now this method is only used to release locks for a given acquirer, - * This can be extended in future to handle other cases as well, like: - * - release lock for given fileToLock and AcquirerId - * - release all locks for given fileToLock - * @param lockInfo File Lock Info instance for which lock need to be removed. - * @throws IOException in case there is some failure in releasing locks. - */ - @Override - public void release(LockInfo lockInfo) throws IOException { - assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; - String[] lockFiles = lockDirectory.listAll(); - - // ideally there should be only one lock per acquirer, but just to handle any stale locks, - // we try to release all the locks for the acquirer. - List locksToRelease = ((FileLockInfo) lockInfo).getLocksForAcquirer(lockFiles); - if (locksToRelease.size() > 1) { - logger.warn(locksToRelease.size() + " locks found for acquirer " + ((FileLockInfo) lockInfo).getAcquirerId()); - } - for (String lock : locksToRelease) { - lockDirectory.deleteFile(lock); - } - } - - /** - * Checks whether a given file have any lock on it or not. - * @param lockInfo File Lock Info instance for which we need to check if lock is acquired. - * @return true if lock is acquired on a file, else false. - * @throws IOException in case there is some failure in checking locks for a file. - */ - @Override - public Boolean isAcquired(LockInfo lockInfo) throws IOException { - assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo"; - Collection lockFiles = lockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getLockPrefix()); - return !lockFiles.isEmpty(); - } - - public void delete() throws IOException { - lockDirectory.delete(); - } -} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 324315505987b..cbb122c285db3 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -30,6 +30,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; @@ -49,8 +50,10 @@ public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase public void setup() { repositoriesServiceSupplier = mock(Supplier.class); repositoriesService = mock(RepositoriesService.class); - threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } @@ -82,7 +85,7 @@ public void testNewDirectory() throws IOException { assertEquals("base_path/uuid_1/0/segments/lock_files/", blobPaths.get(2).buildAsString()); verify(blobContainer).listBlobsByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX); - verify(repositoriesService, times(2)).repository("remote_store_repository"); + verify(repositoriesService, times(1)).repository("remote_store_repository"); } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 66e4b9a357b85..660c3624e480d 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -34,7 +34,8 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; +import org.opensearch.index.store.lockmanager.FileLockInfo; +import org.opensearch.index.store.lockmanager.LockInfo; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import org.opensearch.threadpool.ThreadPool; @@ -64,7 +65,7 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteDirectory remoteDataDirectory; private RemoteDirectory remoteMetadataDirectory; - private RemoteStoreMetadataLockManager mdLockManager; + private RemoteDirectory remoteLockDirectory; private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; private IndexShard indexShard; @@ -75,13 +76,13 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); remoteMetadataDirectory = mock(RemoteDirectory.class); - mdLockManager = mock(RemoteStoreMetadataLockManager.class); + remoteLockDirectory = mock(RemoteDirectory.class); threadPool = mock(ThreadPool.class); remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( remoteDataDirectory, remoteMetadataDirectory, - mdLockManager, + remoteLockDirectory, threadPool ); @@ -399,7 +400,6 @@ public void testOpenInputException() throws IOException { public void testAcquireLock() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); - String mdFile = "xyz"; String acquirerId = "test-acquirer"; long testPrimaryTerm = 1; long testGeneration = 5; @@ -410,22 +410,36 @@ public void testAcquireLock() throws IOException { RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) ) ).thenReturn(metadataFiles); + when(remoteMetadataDirectory.listFilesByPrefix(remoteSegmentStoreDirectory.getLockIdentifier(testPrimaryTerm, testGeneration))) + .thenReturn(metadataFiles); - remoteSegmentStoreDirectory.acquireLock(testPrimaryTerm, testGeneration, acquirerId); - verify(mdLockManager).acquire(any()); + IndexOutput indexOutput = mock(IndexOutput.class); + when(remoteLockDirectory.createOutput(any(), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + + LockInfo lockInfo = new FileLockInfo(remoteSegmentStoreDirectory.getLockIdentifier(testPrimaryTerm, testGeneration), acquirerId); + remoteSegmentStoreDirectory.acquireLock(lockInfo); + verify(remoteLockDirectory).createOutput(lockInfo.generateLockName(), IOContext.DEFAULT); + verify(indexOutput).close(); } public void testAcquireLockNoSuchFile() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); - String testAcquirerId = "test-acquirer"; - long testPrimaryTerm = 2; - long testGeneration = 3; + String acquirerId = "test-acquirer"; + long testPrimaryTerm = 1; + long testGeneration = 5; - assertThrows( - NoSuchFileException.class, - () -> remoteSegmentStoreDirectory.acquireLock(testPrimaryTerm, testGeneration, testAcquirerId) - ); + List metadataFiles = List.of("metadata__1__5__abc"); + when( + remoteMetadataDirectory.listFilesByPrefix( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) + ) + ).thenReturn(metadataFiles); + when(remoteMetadataDirectory.listFilesByPrefix(remoteSegmentStoreDirectory.getLockIdentifier(testPrimaryTerm, testGeneration))) + .thenReturn(List.of()); + + LockInfo lockInfo = new FileLockInfo(remoteSegmentStoreDirectory.getLockIdentifier(testPrimaryTerm, testGeneration), acquirerId); + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.acquireLock(lockInfo)); } public void testReleaseLock() throws IOException { @@ -435,48 +449,49 @@ public void testReleaseLock() throws IOException { long testPrimaryTerm = 1; long testGeneration = 5; - List metadataFiles = List.of("metadata__1__5__abc"); - when( - remoteMetadataDirectory.listFilesByPrefix( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) - ) - ).thenReturn(metadataFiles); + when(remoteMetadataDirectory.listFilesByPrefix("metadata__1__5")).thenReturn(List.of("metadata__1__5__abc")); + FileLockInfo lockInfo = new FileLockInfo( + remoteSegmentStoreDirectory.getLockIdentifier(testPrimaryTerm, testGeneration), + testAcquirerId + ); + + List lockFiles = List.of("metadata__1__5__abc___test-acquirer"); + when(remoteLockDirectory.listFilesByPrefix(lockInfo.getFileToLock())).thenReturn(lockFiles); - remoteSegmentStoreDirectory.releaseLock(testPrimaryTerm, testGeneration, testAcquirerId); - verify(mdLockManager).release(any()); + remoteSegmentStoreDirectory.releaseLock(lockInfo); + verify(remoteLockDirectory).deleteFile("metadata__1__5__abc___test-acquirer"); } - public void testIsAcquired() throws IOException { + public void testIsLockAcquiredTrue() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); long testPrimaryTerm = 1; long testGeneration = 5; - List metadataFiles = List.of("metadata__1__5__abc"); + List lockFiles = List.of("metadata__1__5__abc___snapshot1"); when( - remoteMetadataDirectory.listFilesByPrefix( + remoteLockDirectory.listFilesByPrefix( RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) ) - ).thenReturn(metadataFiles); + ).thenReturn(lockFiles); - remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration); - verify(mdLockManager).isAcquired(any()); + assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5")); } - public void testIsAcquiredException() throws IOException { + public void testIsAcquiredFalse() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); long testPrimaryTerm = 1; long testGeneration = 5; - List metadataFiles = new ArrayList<>(); + List lockFiles = List.of(); when( - remoteMetadataDirectory.listFilesByPrefix( + remoteLockDirectory.listFilesByPrefix( RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration) ) - ).thenReturn(metadataFiles); + ).thenReturn(lockFiles); - assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration)); + assertFalse(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc")); } public void testGetMetadataFileForCommit() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java index 95af53cb6e5ec..3b3239452a475 100644 --- a/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java +++ b/server/src/test/java/org/opensearch/index/store/lockmanager/FileLockInfoTests.java @@ -10,47 +10,34 @@ import org.opensearch.test.OpenSearchTestCase; -import java.util.List; - public class FileLockInfoTests extends OpenSearchTestCase { String testMetadata = "testMetadata"; String testAcquirerId = "testAcquirerId"; - public void testGenerateLockName() { - FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).withAcquirerId(testAcquirerId).build(); - assertEquals(fileLockInfo.generateLockName(), FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)); - } - - public void testGenerateLockNameFailureCase1() { - FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); - assertThrows(IllegalArgumentException.class, fileLockInfo::generateLockName); - } - - public void testGenerateLockNameFailureCase2() { - FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); - assertThrows(IllegalArgumentException.class, fileLockInfo::generateLockName); + public void testFileInfoCreationFailureNoFile() { + assertThrows(IllegalArgumentException.class, () -> new FileLockInfo(null, testAcquirerId)); + assertThrows(IllegalArgumentException.class, () -> new FileLockInfo("", testAcquirerId)); } - public void testGetLockPrefix() { - FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); - assertEquals(fileLockInfo.getLockPrefix(), testMetadata + RemoteStoreLockManagerUtils.SEPARATOR); + public void testFileInfoCreationFailureNoAcquirer() { + assertThrows(IllegalArgumentException.class, () -> new FileLockInfo(testMetadata, null)); + assertThrows(IllegalArgumentException.class, () -> new FileLockInfo(testMetadata, "")); } - public void testGetLockPrefixFailureCase() { - FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); - assertThrows(IllegalArgumentException.class, fileLockInfo::getLockPrefix); + public void testGenerateLockName() { + FileLockInfo fileLockInfo = new FileLockInfo(testMetadata, testAcquirerId); + assertEquals( + fileLockInfo.generateLockName(), + String.join(RemoteStoreLockManagerUtils.SEPARATOR, testMetadata, testAcquirerId) + + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION + ); } public void testGetLocksForAcquirer() { - String[] locks = new String[] { - FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId), - FileLockInfo.LockFileUtils.generateLockName(testMetadata, "acquirerId2") }; - FileLockInfo fileLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); + String lock1 = new FileLockInfo(testMetadata, testAcquirerId).generateLockName(); + String lock2 = new FileLockInfo(testMetadata, "acquirerId2").generateLockName(); - assertEquals( - fileLockInfo.getLocksForAcquirer(locks), - List.of(FileLockInfo.LockFileUtils.generateLockName(testMetadata, testAcquirerId)) - ); + assertEquals(testAcquirerId, FileLockInfo.getAcquirerIdFromLock(lock1)); + assertEquals("acquirerId2", FileLockInfo.getAcquirerIdFromLock(lock2)); } - } diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactoryTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactoryTests.java deleted file mode 100644 index 61b4cc2176134..0000000000000 --- a/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactoryTests.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.lockmanager; - -import org.junit.Before; -import org.mockito.ArgumentCaptor; -import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.function.Supplier; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -public class RemoteStoreLockManagerFactoryTests extends OpenSearchTestCase { - - private Supplier repositoriesServiceSupplier; - private RepositoriesService repositoriesService; - private RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; - - @Before - public void setup() throws IOException { - repositoriesServiceSupplier = mock(Supplier.class); - repositoriesService = mock(RepositoriesService.class); - when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(repositoriesServiceSupplier); - } - - public void testNewLockManager() throws IOException { - - String testRepository = "testRepository"; - String testIndexUUID = "testIndexUUID"; - String testShardId = "testShardId"; - - BlobStoreRepository repository = mock(BlobStoreRepository.class); - BlobStore blobStore = mock(BlobStore.class); - BlobContainer blobContainer = mock(BlobContainer.class); - when(repository.blobStore()).thenReturn(blobStore); - when(repository.basePath()).thenReturn(new BlobPath().add("base_path")); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); - - when(repositoriesService.repository(testRepository)).thenReturn(repository); - - RemoteStoreLockManager lockManager = remoteStoreLockManagerFactory.newLockManager(testRepository, testIndexUUID, testShardId); - - assertTrue(lockManager != null); - ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); - verify(blobStore, times(1)).blobContainer(blobPathCaptor.capture()); - List blobPaths = blobPathCaptor.getAllValues(); - assertEquals("base_path/" + testIndexUUID + "/" + testShardId + "/segments/lock_files/", blobPaths.get(0).buildAsString()); - - verify(repositoriesService).repository(testRepository); - } - -} diff --git a/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManagerTests.java b/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManagerTests.java deleted file mode 100644 index 2a3851514db3c..0000000000000 --- a/server/src/test/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManagerTests.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.lockmanager; - -import junit.framework.TestCase; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; -import org.junit.Before; -import org.opensearch.index.store.RemoteBufferedOutputDirectory; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; - -public class RemoteStoreMetadataLockManagerTests extends OpenSearchTestCase { - private RemoteBufferedOutputDirectory lockDirectory; - private RemoteStoreMetadataLockManager remoteStoreMetadataLockManager; - String testLockName = "testLock"; - String testMetadata = "testMetadata"; - String testAcquirerId = "testAcquirerId"; - - @Before - public void setup() throws IOException { - lockDirectory = mock(RemoteBufferedOutputDirectory.class); - - remoteStoreMetadataLockManager = new RemoteStoreMetadataLockManager(lockDirectory); - } - - private Collection getListOfLocksMock() { - return Arrays.asList( - String.join(RemoteStoreLockManagerUtils.SEPARATOR, testMetadata, testAcquirerId) - + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, - String.join(RemoteStoreLockManagerUtils.SEPARATOR, testMetadata, "acquirerId2") - + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION - ); - } - - public void testAcquire() throws IOException { - IndexOutput indexOutput = mock(IndexOutput.class); - FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).withAcquirerId(testAcquirerId).build(); - when(lockDirectory.createOutput(eq(testLockInfo.generateLockName()), eq(IOContext.DEFAULT))).thenReturn(indexOutput); - remoteStoreMetadataLockManager.acquire(testLockInfo); - verify(indexOutput).close(); - } - - public void testAcquireOnlyFileToLockPassed() { // only fileToLock was passed to acquire call. - IndexOutput indexOutput = mock(IndexOutput.class); - when(lockDirectory.createOutput(eq(testLockName), eq(IOContext.DEFAULT))).thenReturn(indexOutput); - FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); - assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.acquire(testLockInfo)); - } - - public void testAcquireOnlyAcquirerIdPassed() { // only AcquirerId was passed to acquire call. - IndexOutput indexOutput = mock(IndexOutput.class); - when(lockDirectory.createOutput(eq(testLockName), eq(IOContext.DEFAULT))).thenReturn(indexOutput); - LockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); - assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.acquire(testLockInfo)); - } - - public void testRelease() throws IOException { - when(lockDirectory.listAll()).thenReturn(getListOfLocksMock().toArray(new String[0])); - FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); - - remoteStoreMetadataLockManager.release(testLockInfo); - verify(lockDirectory).deleteFile( - String.join(RemoteStoreLockManagerUtils.SEPARATOR, testMetadata, testAcquirerId) - + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION - ); - } - - public void testReleaseExceptionCase() { // acquirerId is Not passed during release lock call. - FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); - assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.release(testLockInfo)); - } - - public void testIsAcquired() throws IOException { - FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withFileToLock(testMetadata).build(); - when(lockDirectory.listFilesByPrefix(testLockInfo.getLockPrefix())).thenReturn(getListOfLocksMock()); - TestCase.assertTrue(remoteStoreMetadataLockManager.isAcquired(testLockInfo)); - } - - public void testIsAcquiredExceptionCase() { // metadata file is not passed during isAcquired call. - FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build(); - assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.isAcquired(testLockInfo)); - } -} diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7f3819563dcbd..641dc4f8bb7f1 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -102,8 +102,6 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.RemoteBufferedOutputDirectory; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; @@ -663,10 +661,8 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( - new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) - ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool); + RemoteDirectory lockDirectory = new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, lockDirectory, threadPool); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {