Skip to content

Commit

Permalink
Nit fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <[email protected]>
  • Loading branch information
rayshrey committed May 13, 2024
1 parent 5013be9 commit 7491c62
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.Map;

Expand All @@ -36,7 +35,7 @@
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
@TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
public class CompositeDirectoryIT extends RemoteStoreBaseIntegTestCase {

/*
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ public static DataLocalityType getValueOf(final String localityType) {
if (type != null) {
return type;
}
throw new IllegalArgumentException("Unknown Locality Type constant [" + localityType + "].");
throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "].");
}
}

Expand Down
21 changes: 15 additions & 6 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,14 +618,23 @@ public synchronized IndexShard createShard(
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
this.indexSettings.isStoreLocalityPartial()) {
/*
* Currently Composite Directory only supports local directory to be of type FSDirectory
* The reason is that FileCache currently has it key type as Path
* Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache
* TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion
Currently Composite Directory only supports local directory to be of type FSDirectory
The reason is that FileCache currently has it key type as Path
Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache
TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion
*/
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
assert localDirectory instanceof FSDirectory : "For Composite Directory, local directory must be of type FSDirectory";
assert fileCache != null : "File Cache not initialized on this Node, cannot create Composite Directory without FileCache";

if (localDirectory instanceof FSDirectory == false) throw new IllegalStateException(
"For Composite Directory, local directory must be of type FSDirectory"
);
else if (fileCache == null) throw new IllegalStateException(
"File Cache not initialized on this Node, cannot create Composite Directory without FileCache"
);
else if (remoteDirectory == null) throw new IllegalStateException(
"Remote Directory must not be null for Composite Directory"
);

directory = new CompositeDirectory((FSDirectory) localDirectory, (RemoteSegmentStoreDirectory) remoteDirectory, fileCache);
} else {
directory = directoryFactory.newDirectory(this.indexSettings, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public String[] listAll() throws IOException {

/**
* Removes an existing file in the directory.
* Currently deleting only from local directory as files from remote should not be deleted due to availability reasons
* Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory
* @param name the name of an existing file.
* @throws IOException in case of I/O error
*/
Expand Down Expand Up @@ -181,6 +181,7 @@ public void rename(String source, String dest) throws IOException {
localDirectory.rename(source, dest);
fileCache.remove(localDirectory.getDirectory().resolve(source));
cacheFile(dest);
fileCache.decRef(localDirectory.getDirectory().resolve(dest));
}

/**
Expand Down Expand Up @@ -243,10 +244,6 @@ public void close() throws IOException {
*/
public void afterSyncToRemote(Collection<String> files) throws IOException {
logger.trace("afterSyncToRemote called for {}", files);
if (remoteDirectory == null) {
logger.trace("afterSyncToRemote called even though remote directory is not set");
return;
}
for (String fileName : files) {
/*
Decrementing the refCount here for the path so that it becomes eligible for eviction
Expand All @@ -273,10 +270,9 @@ private String[] getRemoteFiles() throws IOException {
remoteFiles = remoteDirectory.listAll();
} catch (NullPointerException e) {
/*
There are two scenarios where the listAll() call on remote directory returns NullPointerException:
- When remote directory is not set
- When init() of remote directory has not yet been called
Returning an empty list in the above scenarios
We can encounter NPE when no data has been uploaded to remote store yet and as a result the metadata is empty
Empty metadata means that there are no files currently in remote, hence returning an empty list in this scenario
TODO : Catch the NPE in listAll of RemoteSegmentStoreDirectory itself instead of catching here
*/
remoteFiles = new String[0];
}
Expand All @@ -285,7 +281,7 @@ There are two scenarios where the listAll() call on remote directory returns Nul

private void cacheFile(String name) throws IOException {
Path filePath = localDirectory.getDirectory().resolve(name);
// put will increase the refCount for the path, making sure it is not evicted, wil decrease the ref after it is uploaded to Remote
// put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote
// so that it can be evicted after that
// this is just a temporary solution, will pin the file once support for that is added in FileCache
// TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
*/
@ExperimentalApi
public class FullFileCachedIndexInput implements CachedIndexInput {

private final IndexInput indexInput;
private final FileCache fileCache;
private final Path path;
private final FileCachedIndexInput fileCachedIndexInput;
Expand All @@ -35,7 +33,6 @@ public class FullFileCachedIndexInput implements CachedIndexInput {
public FullFileCachedIndexInput(FileCache fileCache, Path path, IndexInput indexInput) {
this.fileCache = fileCache;
this.path = path;
this.indexInput = indexInput;
fileCachedIndexInput = new FileCachedIndexInput(fileCache, path, indexInput);
isClosed = new AtomicBoolean(false);
}
Expand All @@ -54,7 +51,7 @@ public IndexInput getIndexInput() {
*/
@Override
public long length() {
return indexInput.length();
return fileCachedIndexInput.length();
}

/**
Expand All @@ -71,7 +68,7 @@ public boolean isClosed() {
@Override
public void close() throws Exception {
if (!isClosed.getAndSet(true)) {
indexInput.close();
fileCachedIndexInput.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -32,6 +33,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -50,11 +52,16 @@ public void setup() throws IOException {
}

public void testListAll() throws IOException {
when(localDirectory.listAll()).thenReturn(new String[]{});
String[] actualFileNames = compositeDirectory.listAll();
String[] expectedFileNames = new String[] {};
assertArrayEquals(expectedFileNames, actualFileNames);

populateMetadata();
when(localDirectory.listAll()).thenReturn(new String[] { "_1.cfe", "_2.cfe", "_0.cfe_block_7", "_0.cfs_block_7" });

String[] actualFileNames = compositeDirectory.listAll();
String[] expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1" };
actualFileNames = compositeDirectory.listAll();
expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1" };
assertArrayEquals(expectedFileNames, actualFileNames);
}

Expand Down Expand Up @@ -169,4 +176,14 @@ public void testClose() throws IOException {
verify(fileCache).remove(resolvedPath1);
verify(fileCache).remove(resolvedPath2);
}

public void testAfterSyncToRemote() throws IOException {
Path basePath = mock(Path.class);
Path resolvedPath = mock(Path.class);
when(basePath.resolve(anyString())).thenReturn(resolvedPath);
when(localDirectory.getDirectory()).thenReturn(basePath);
Collection<String> files = Arrays.asList("_0.si", "_0.cfs");
compositeDirectory.afterSyncToRemote(files);
verify(fileCache, times(files.size())).decRef(resolvedPath);
}
}

0 comments on commit 7491c62

Please sign in to comment.