Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Simplify lock manager interface #8399

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.LockInfo;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
Expand Down Expand Up @@ -1497,7 +1499,9 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommitAndRefresh(boolean flus
*/
public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory();
remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId);
String lockIdentifier = remoteSegmentStoreDirectory.getLockIdentifier(primaryTerm, generation);
LockInfo lockInfo = new FileLockInfo(lockIdentifier, snapshotId);
remoteSegmentStoreDirectory.acquireLock(lockInfo);
}

/**
Expand All @@ -1509,7 +1513,9 @@ public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long ge
*/
public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory();
remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId);
String lockIdentifier = remoteSegmentStoreDirectory.getLockIdentifier(primaryTerm, generation);
LockInfo lockInfo = new FileLockInfo(lockIdentifier, snapshotId);
remoteSegmentStoreDirectory.releaseLock(lockInfo);
}

public Optional<NRTReplicationEngine> getReplicationEngine() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
import org.opensearch.index.store.lockmanager.LockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
Expand Down Expand Up @@ -55,7 +55,7 @@
* another instance of {@code RemoteDirectory}.
* @opensearch.internal
*/
public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreCommitLevelLockManager {
public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreLockManager {
/**
* Each segment file is uploaded with unique suffix.
* For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG
Expand All @@ -74,7 +74,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
*/
private final RemoteDirectory remoteMetadataDirectory;

private final RemoteStoreLockManager mdLockManager;
private final RemoteDirectory remoteLockDirectory;

private final ThreadPool threadPool;

Expand Down Expand Up @@ -108,13 +108,13 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
public RemoteSegmentStoreDirectory(
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
RemoteStoreLockManager mdLockManager,
RemoteDirectory remoteLockDirectory,
ThreadPool threadPool
) throws IOException {
super(remoteDataDirectory);
this.remoteDataDirectory = remoteDataDirectory;
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.remoteLockDirectory = remoteLockDirectory;
this.threadPool = threadPool;
init();
}
Expand Down Expand Up @@ -370,45 +370,76 @@ public IndexInput openInput(String name, IOContext context) throws IOException {

/**
* This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo}
* @param primaryTerm Primary Term of index at the time of commit.
* @param generation Commit Generation
* @param acquirerId Lock Acquirer ID which wants to acquire lock on the commit.
* @param lockInfo lock identifier and acquirer ID info
* @throws IOException will be thrown in case i) listing file failed or ii) Writing the lock file failed.
* @throws NoSuchFileException when metadata file is not present for given commit point.
*/
@Override
public void acquireLock(long primaryTerm, long generation, String acquirerId) throws IOException {
String metadataFile = getMetadataFileForCommit(primaryTerm, generation);

mdLockManager.acquire(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build());
public void acquireLock(LockInfo lockInfo) throws IOException {
assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo";
String filename = ((FileLockInfo) lockInfo).getFileToLock();
Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(filename);
if (metadataFiles.isEmpty()) {
throw new NoSuchFileException("Metadata file " + filename + " does not exist to acquire lock");
}
IndexOutput indexOutput = remoteLockDirectory.createOutput(lockInfo.generateLockName(), IOContext.DEFAULT);
indexOutput.close();
}

/**
* Releases a lock which was acquired on given segment commit.
* @param primaryTerm Primary Term of index at the time of commit.
* @param generation Commit Generation
* @param acquirerId Acquirer ID for which lock needs to be released.
* @param lockInfo lock identifier and acquirer ID info
* @throws IOException will be thrown in case i) listing lock files failed or ii) deleting the lock file failed.
* @throws NoSuchFileException when metadata file is not present for given commit point.
*/
@Override
public void releaseLock(long primaryTerm, long generation, String acquirerId) throws IOException {
String metadataFile = getMetadataFileForCommit(primaryTerm, generation);
mdLockManager.release(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).withAcquirerId(acquirerId).build());
public void releaseLock(LockInfo lockInfo) throws IOException {
assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo";
Collection<String> lockFiles = remoteLockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getFileToLock());

if (lockFiles.isEmpty()) {
logger.warn("Lock file {} does not exist", ((FileLockInfo) lockInfo).getFileToLock());
return;
}
// ideally there should be only one lock per acquirer, but just to handle any stale locks,
// we try to release all the locks for the acquirer.
String acquirerId = ((FileLockInfo) lockInfo).getAcquirerId();
List<String> locksToRelease = lockFiles.stream()
.filter(lockFile -> FileLockInfo.getAcquirerIdFromLock(lockFile).equals(acquirerId))
.collect(Collectors.toList());
if (locksToRelease.size() > 1) {
logger.warn(locksToRelease.size() + " locks found for acquirer " + ((FileLockInfo) lockInfo).getAcquirerId());
}
for (String lock : locksToRelease) {
remoteLockDirectory.deleteFile(lock);
}
}

/**
* Checks if a specific commit have any corresponding lock file.
* @param primaryTerm Primary Term of index at the time of commit.
* @param generation Commit Generation
* @param lockInfo lock identifier and acquirer ID info
* @return True if there is at least one lock for given primary term and generation.
* @throws IOException will be thrown in case listing lock files failed.
* @throws NoSuchFileException when metadata file is not present for given commit point.
*/
@Override
public Boolean isLockAcquired(long primaryTerm, long generation) throws IOException {
String metadataFile = getMetadataFileForCommit(primaryTerm, generation);
return mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build());
public Boolean isLockAcquired(LockInfo lockInfo) throws IOException {
assert lockInfo instanceof FileLockInfo : "lockInfo should be instance of FileLockInfo";
Collection<String> lockFiles = remoteLockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getFileToLock());
String acquirerId = ((FileLockInfo) lockInfo).getAcquirerId();
List<String> locksByAcquirer = lockFiles.stream()
.filter(lockFile -> FileLockInfo.getAcquirerIdFromLock(lockFile).equals(acquirerId))
.collect(Collectors.toList());
return !locksByAcquirer.isEmpty();
}

@Override
public Boolean isLockAcquired(String lockIdentifier) throws IOException {
Collection<String> lockFiles = remoteLockDirectory.listFilesByPrefix(lockIdentifier);
return lockFiles.isEmpty() == false;
}

public String getLockIdentifier(long primaryTerm, long generation) throws IOException {
return getMetadataFileForCommit(primaryTerm, generation);
}

// Visible for testing
Expand Down Expand Up @@ -610,8 +641,10 @@ private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOExceptio
return true;
}
return !isLockAcquired(
MetadataFilenameUtils.getPrimaryTerm(metadataFile.split(MetadataFilenameUtils.SEPARATOR)),
MetadataFilenameUtils.getGeneration(metadataFile.split(MetadataFilenameUtils.SEPARATOR))
getLockIdentifier(
MetadataFilenameUtils.getPrimaryTerm(metadataFile.split(MetadataFilenameUtils.SEPARATOR)),
MetadataFilenameUtils.getGeneration(metadataFile.split(MetadataFilenameUtils.SEPARATOR))
)
);
} catch (IOException e) {
logger.error(
Expand Down Expand Up @@ -708,7 +741,7 @@ private boolean deleteIfEmpty() throws IOException {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
remoteLockDirectory.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory", e);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -59,14 +57,13 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh

RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data");
RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata");
RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
repositoriesService.get(),
repositoryName,
indexUUID,
shardId
BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS);
RemoteBufferedOutputDirectory remoteLockDirectory = createRemoteBufferedOutputDirectory(
repository,
shardLevelBlobPath,
"lock_files"
);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool);
return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteLockDirectory, threadPool);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
Expand All @@ -77,4 +74,14 @@ private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath co
BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath);
return new RemoteDirectory(dataBlobContainer);
}

private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory(
Repository repository,
BlobPath commonBlobPath,
String extention
) {
BlobPath extendedPath = commonBlobPath.add(extention);
BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath);
return new RemoteBufferedOutputDirectory(dataBlobContainer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,24 @@

package org.opensearch.index.store.lockmanager;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.core.common.Strings;

/**
* A Class that defines Info about Remote Store File Lock.
* This is used to provide Remote Store Lock Information and some utility methods for the Lock file.
* @opensearch.internal
*/
public class FileLockInfo implements LockInfo {
private String fileToLock;
private String acquirerId;
private final String fileToLock;
private final String acquirerId;

public FileLockInfo(String fileToLock, String acquirerId) {
if (Strings.isNullOrEmpty(fileToLock) || Strings.isNullOrEmpty(acquirerId)) {
throw new IllegalArgumentException("Both the arguments should be non-empty");
}
this.fileToLock = fileToLock;
this.acquirerId = acquirerId;
}

public String getAcquirerId() {
return acquirerId;
Expand All @@ -29,101 +35,17 @@ public String getFileToLock() {
return fileToLock;
}

private void setFileToLock(String fileName) {
this.fileToLock = fileName;
}

private void setAcquirerId(String acquirerId) {
this.acquirerId = acquirerId;
}

@Override
public String generateLockName() {
validateRequiredParameters(this);
return LockFileUtils.generateLockName(fileToLock, acquirerId);
}

String getLockPrefix() {
if (fileToLock == null || fileToLock.isBlank()) {
throw new IllegalArgumentException("File to Lock should be provided");
}
return fileToLock + RemoteStoreLockManagerUtils.SEPARATOR;
}

List<String> getLocksForAcquirer(String[] lockFiles) {
if (acquirerId == null || acquirerId.isBlank()) {
throw new IllegalArgumentException("Acquirer ID should be provided");
}
return Arrays.stream(lockFiles)
.filter(lockFile -> acquirerId.equals(LockFileUtils.getAcquirerIdFromLock(lockFile)))
.collect(Collectors.toList());
}

public static LockInfoBuilder getLockInfoBuilder() {
return new LockInfoBuilder();
}

private static void validateRequiredParameters(FileLockInfo fileLockInfo) {
if (fileLockInfo.getAcquirerId() == null || fileLockInfo.getAcquirerId().isBlank()) {
throw new IllegalArgumentException("Acquirer ID should be provided");
}
if (fileLockInfo.getFileToLock() == null || fileLockInfo.getFileToLock().isBlank()) {
throw new IllegalArgumentException("File to Lock should be provided");
}
}

static class LockFileUtils {
static String generateLockName(String fileToLock, String acquirerId) {
return String.join(RemoteStoreLockManagerUtils.SEPARATOR, fileToLock, acquirerId)
+ RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION;
}

public static String getFileToLockNameFromLock(String lockName) {
String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR);

if (lockNameTokens.length != 2) {
throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid.");
}
return lockNameTokens[0];
}

public static String getAcquirerIdFromLock(String lockName) {
String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR);

if (lockNameTokens.length != 2) {
throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid.");
}
return lockNameTokens[1].replace(RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, "");
}
return String.join(RemoteStoreLockManagerUtils.SEPARATOR, fileToLock, acquirerId) + RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION;
}

/**
* A Builder Class to build an Instance of {@code FileLockInfo}
* @opensearch.internal
*/
public static class LockInfoBuilder implements LockInfo.LockInfoBuilder {
private final FileLockInfo lockFileInfo;

LockInfoBuilder() {
this.lockFileInfo = new FileLockInfo();
}

public LockInfoBuilder withFileToLock(String fileToLock) {
lockFileInfo.setFileToLock(fileToLock);
return this;
}

public LockInfoBuilder withAcquirerId(String acquirerId) {
lockFileInfo.setAcquirerId(acquirerId);
return this;
}
public static String getAcquirerIdFromLock(String lockName) {
String[] lockNameTokens = lockName.split(RemoteStoreLockManagerUtils.SEPARATOR);

@Override
public FileLockInfo build() {
if (lockFileInfo.fileToLock == null && lockFileInfo.acquirerId == null) {
throw new IllegalStateException("Either File to Lock or AcquirerId should be provided to instantiate FileLockInfo");
}
return lockFileInfo;
if (lockNameTokens.length != 2) {
throw new IllegalArgumentException("Provided Lock Name " + lockName + " is not Valid.");
}
return lockNameTokens[1].replace(RemoteStoreLockManagerUtils.LOCK_FILE_EXTENSION, "");
}
}
Loading