Skip to content

Commit

Permalink
Remove method level locks
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <[email protected]>
  • Loading branch information
rayshrey committed May 10, 2024
1 parent 4ee744d commit 3292b36
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 216 deletions.
279 changes: 84 additions & 195 deletions server/src/main/java/org/opensearch/index/store/CompositeDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
Expand All @@ -54,9 +53,6 @@ public class CompositeDirectory extends FilterDirectory {
private final RemoteSegmentStoreDirectory remoteDirectory;
private final FileCache fileCache;
private final TransferManager transferManager;
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();

/**
* Constructor to initialise the composite directory
Expand Down Expand Up @@ -87,25 +83,20 @@ public CompositeDirectory(FSDirectory localDirectory, RemoteSegmentStoreDirector
@Override
public String[] listAll() throws IOException {
logger.trace("listAll() called");
readLock.lock();
try {
String[] localFiles = localDirectory.listAll();
logger.trace("Local Directory files : {}", () -> Arrays.toString(localFiles));
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles));
String[] remoteFiles = getRemoteFiles();
allFiles.addAll(Arrays.asList(remoteFiles));
logger.trace("Remote Directory files : {}", () -> Arrays.toString(remoteFiles));
Set<String> localLuceneFiles = allFiles.stream()
.filter(file -> !FileType.isBlockFile(file))
.collect(Collectors.toUnmodifiableSet());
String[] files = new String[localLuceneFiles.size()];
localLuceneFiles.toArray(files);
Arrays.sort(files);
logger.trace("listAll() returns : {}", () -> Arrays.toString(files));
return files;
} finally {
readLock.unlock();
}
String[] localFiles = localDirectory.listAll();
logger.trace("Local Directory files : {}", () -> Arrays.toString(localFiles));
Set<String> allFiles = new HashSet<>(Arrays.asList(localFiles));
String[] remoteFiles = getRemoteFiles();
allFiles.addAll(Arrays.asList(remoteFiles));
logger.trace("Remote Directory files : {}", () -> Arrays.toString(remoteFiles));
Set<String> localLuceneFiles = allFiles.stream()
.filter(file -> !FileType.isBlockFile(file))
.collect(Collectors.toUnmodifiableSet());
String[] files = new String[localLuceneFiles.size()];
localLuceneFiles.toArray(files);
Arrays.sort(files);
logger.trace("listAll() returns : {}", () -> Arrays.toString(files));
return files;
}

/**
Expand All @@ -117,16 +108,11 @@ public String[] listAll() throws IOException {
@Override
public void deleteFile(String name) throws IOException {
logger.trace("deleteFile() called {}", name);
writeLock.lock();
try {
/*
Not deleting from localDirectory directly since it causes a race condition when the localDirectory deletes a file, and it ends up in pendingDeletion state.
Meanwhile, fileCache on removal deletes the file directly via the Files class and later when the directory tries to delete the files pending for deletion (which happens before creating a new file), it causes NoSuchFileException and new file creation fails
*/
fileCache.remove(localDirectory.getDirectory().resolve(name));
} finally {
writeLock.unlock();
}
/*
Not deleting from localDirectory directly since it causes a race condition when the localDirectory deletes a file, and it ends up in pendingDeletion state.
Meanwhile, fileCache on removal deletes the file directly via the Files class and later when the directory tries to delete the files pending for deletion (which happens before creating a new file), it causes NoSuchFileException and new file creation fails
*/
fileCache.remove(localDirectory.getDirectory().resolve(name));
}

/**
Expand All @@ -138,25 +124,20 @@ Meanwhile, fileCache on removal deletes the file directly via the Files class an
@Override
public long fileLength(String name) throws IOException {
logger.trace("fileLength() called {}", name);
readLock.lock();
try {
long fileLength;
Path key = localDirectory.getDirectory().resolve(name);
if (isTempFile(name) || fileCache.get(key) != null) {
try {
fileLength = localDirectory.fileLength(name);
logger.trace("fileLength from Local {}", fileLength);
} finally {
fileCache.decRef(key);
}
} else {
fileLength = remoteDirectory.fileLength(name);
logger.trace("fileLength from Remote {}", fileLength);
long fileLength;
Path key = localDirectory.getDirectory().resolve(name);
if (isTempFile(name) || fileCache.get(key) != null) {
try {
fileLength = localDirectory.fileLength(name);
logger.trace("fileLength from Local {}", fileLength);
} finally {
fileCache.decRef(key);
}
return fileLength;
} finally {
readLock.unlock();
} else {
fileLength = remoteDirectory.fileLength(name);
logger.trace("fileLength from Remote {}", fileLength);
}
return fileLength;
}

/**
Expand All @@ -168,33 +149,8 @@ public long fileLength(String name) throws IOException {
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
logger.trace("createOutput() called {}", name);
writeLock.lock();
try {
/*
* The CloseableFilterIndexOutput will ensure that the file is added to FileCache once write is completed on this file
*/
return new CloseableFilterIndexOutput(localDirectory.createOutput(name, context), name, this::cacheFile);
} finally {
writeLock.unlock();
}
}

/**
* Creates a new, empty, temporary file in the directory and returns an {@link IndexOutput}
* instance for appending data to this file.
*
* <p>The temporary file name (accessible via {@link IndexOutput#getName()}) will start with
* {@code prefix}, end with {@code suffix} and have a reserved file extension {@code .tmp}.
*/
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
logger.trace("createTempOutput() called {} , {}", prefix, suffix);
writeLock.lock();
try {
return localDirectory.createTempOutput(prefix, suffix, context);
} finally {
writeLock.unlock();
}
// The CloseableFilterIndexOutput will ensure that the file is added to FileCache once write is completed on this file
return new CloseableFilterIndexOutput(localDirectory.createOutput(name, context), name, this::cacheFile);
}

/**
Expand All @@ -204,32 +160,10 @@ public IndexOutput createTempOutput(String prefix, String suffix, IOContext cont
@Override
public void sync(Collection<String> names) throws IOException {
logger.trace("sync() called {}", names);
writeLock.lock();
try {
Collection<String> remoteFiles = Arrays.asList(getRemoteFiles());
Collection<String> filesToSync = names.stream()
.filter(name -> remoteFiles.contains(name) == false)
.collect(Collectors.toList());
logger.trace("Synced files : {}", filesToSync);
localDirectory.sync(filesToSync);
} finally {
writeLock.unlock();
}
}

/**
* Ensures that directory metadata, such as recent file renames, are moved to stable storage.
* @throws IOException in case of I/O error
*/
@Override
public void syncMetaData() throws IOException {
logger.trace("syncMetaData() called ");
writeLock.lock();
try {
localDirectory.syncMetaData();
} finally {
writeLock.unlock();
}
Collection<String> remoteFiles = Arrays.asList(getRemoteFiles());
Collection<String> filesToSync = names.stream().filter(name -> remoteFiles.contains(name) == false).collect(Collectors.toList());
logger.trace("Synced files : {}", filesToSync);
localDirectory.sync(filesToSync);
}

/**
Expand All @@ -240,14 +174,9 @@ public void syncMetaData() throws IOException {
@Override
public void rename(String source, String dest) throws IOException {
logger.trace("rename() called {}, {}", source, dest);
writeLock.lock();
try {
localDirectory.rename(source, dest);
fileCache.remove(localDirectory.getDirectory().resolve(source));
cacheFile(dest);
} finally {
writeLock.unlock();
}
localDirectory.rename(source, dest);
fileCache.remove(localDirectory.getDirectory().resolve(source));
cacheFile(dest);
}

/**
Expand All @@ -259,48 +188,35 @@ public void rename(String source, String dest) throws IOException {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
logger.trace("openInput() called {}", name);
writeLock.lock();
try {
/*
* We aren't tracking temporary files (created via createTempOutput) currently in FileCache as these are created and then deleted within a very short span of time
* We will be reading them directory from the local directory
*/
if (isTempFile(name)) {
return localDirectory.openInput(name, context);
}
/*
* Return directly from the FileCache (via TransferManager) if complete file is present
*/

Path key = localDirectory.getDirectory().resolve(name);
CachedIndexInput indexInput = fileCache.get(key);
if (indexInput != null) {
logger.trace("Complete file found in FileCache");
try {
return indexInput.getIndexInput().clone();
} finally {
fileCache.decRef(key);
}
}
/*
* If file has been uploaded to the Remote Store, fetch it from the Remote Store in blocks via OnDemandCompositeBlockIndexInput
*/
else {
logger.trace("Complete file not in FileCache, to be fetched in Blocks from Remote");
RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = remoteSegmentMetadata.getMetadata().get(name);
/*
* TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot specific
*/
BlobStoreIndexShardSnapshot.FileInfo fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
name,
new StoreFileMetadata(name, uploadedSegmentMetadata.getLength(), uploadedSegmentMetadata.getChecksum(), Version.LATEST),
null
);
return new OnDemandBlockSnapshotIndexInput(fileInfo, localDirectory, transferManager);
// We aren't tracking temporary files (created via createTempOutput) currently in FileCache as these are created and then deleted
// within a very short span of time
// We will be reading them directory from the local directory
if (isTempFile(name)) {
return localDirectory.openInput(name, context);
}
// Return directly from the FileCache (via TransferManager) if complete file is present
Path key = localDirectory.getDirectory().resolve(name);
CachedIndexInput indexInput = fileCache.get(key);
if (indexInput != null) {
logger.trace("Complete file found in FileCache");
try {
return indexInput.getIndexInput().clone();
} finally {
fileCache.decRef(key);
}
} finally {
writeLock.unlock();
}
// If file has been uploaded to the Remote Store, fetch it from the Remote Store in blocks via OnDemandCompositeBlockIndexInput
else {
logger.trace("Complete file not in FileCache, to be fetched in Blocks from Remote");
RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = remoteSegmentMetadata.getMetadata().get(name);
// TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot specific
BlobStoreIndexShardSnapshot.FileInfo fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
name,
new StoreFileMetadata(name, uploadedSegmentMetadata.getLength(), uploadedSegmentMetadata.getChecksum(), Version.LATEST),
null
);
return new OnDemandBlockSnapshotIndexInput(fileInfo, localDirectory, transferManager);
}
}

Expand All @@ -310,31 +226,9 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
*/
@Override
public void close() throws IOException {
writeLock.lock();
try {
Arrays.stream(localDirectory.listAll()).forEach(f -> {
logger.trace("Removing file from cache {}", f);
fileCache.remove(localDirectory.getDirectory().resolve(f));
});
localDirectory.close();
remoteDirectory.close();
} finally {
writeLock.unlock();
}
}

/**
* Returns a set of files currently pending deletion in this directory.
* @throws IOException in case of I/O error
*/
@Override
public Set<String> getPendingDeletions() throws IOException {
writeLock.lock();
try {
return localDirectory.getPendingDeletions();
} finally {
writeLock.unlock();
}
Arrays.stream(localDirectory.listAll()).forEach(f -> fileCache.remove(localDirectory.getDirectory().resolve(f)));
localDirectory.close();
remoteDirectory.close();
}

/**
Expand All @@ -350,19 +244,15 @@ public void afterSyncToRemote(Collection<String> files) throws IOException {
return;
}
for (String fileName : files) {
writeLock.lock();
try {
/* Decrementing the refCount here for the path so that it becomes eligible for eviction
* This is a temporary solution until pinning support is added
* TODO - Unpin the files here from FileCache so that they become eligible for eviction, once pinning/unpinning support is added in FileCache
* Uncomment the below commented line(to remove the file from cache once uploaded) to test block based functionality
*/
logger.trace("File {} uploaded to Remote Store and now can be eligible for eviction in FileCache", fileName);
fileCache.decRef(localDirectory.getDirectory().resolve(fileName));
// fileCache.remove(localDirectory.getDirectory().resolve(fileName));
} finally {
writeLock.unlock();
}
/*
Decrementing the refCount here for the path so that it becomes eligible for eviction
This is a temporary solution until pinning support is added
TODO - Unpin the files here from FileCache so that they become eligible for eviction, once pinning/unpinning support is added in FileCache
Uncomment the below commented line(to remove the file from cache once uploaded) to test block based functionality
*/
logger.trace("File {} uploaded to Remote Store and now can be eligible for eviction in FileCache", fileName);
fileCache.decRef(localDirectory.getDirectory().resolve(fileName));
// fileCache.remove(localDirectory.getDirectory().resolve(fileName));
}
}

Expand All @@ -379,11 +269,10 @@ private String[] getRemoteFiles() throws IOException {
remoteFiles = remoteDirectory.listAll();
} catch (NullPointerException e) {
/*
* There are two scenarios where the listAll() call on remote directory returns NullPointerException:
* - When remote directory is not set
* - When init() of remote directory has not yet been called
*
* Returning an empty list in these scenarios
There are two scenarios where the listAll() call on remote directory returns NullPointerException:
- When remote directory is not set
- When init() of remote directory has not yet been called
Returning an empty list in the above scenarios
*/
remoteFiles = new String[0];
}
Expand Down
Loading

0 comments on commit 3292b36

Please sign in to comment.