Skip to content

Commit

Permalink
Add useCache flag to be used by caller
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Dec 4, 2023
1 parent 1dee957 commit e6f5583
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private boolean syncSegments() {
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
if (isRefreshAfterCommit()) {
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles(), true);
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final ThreadPool threadPool;

private final Cache<String, Boolean> lockCache;
private final Cache<String, Boolean> acquiredLockCache;

/**
* Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
Expand Down Expand Up @@ -136,7 +136,8 @@ public RemoteSegmentStoreDirectory(
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.threadPool = threadPool;
this.lockCache = CacheBuilder.<String, Boolean>builder().setExpireAfterWrite(TimeValue.timeValueHours(1)).build();
// ToDo: make the cache TTL configurable
this.acquiredLockCache = CacheBuilder.<String, Boolean>builder().setExpireAfterWrite(TimeValue.timeValueHours(1)).build();
this.logger = Loggers.getLogger(getClass(), shardId);
init();
}
Expand Down Expand Up @@ -519,20 +520,20 @@ public void releaseLock(long primaryTerm, long generation, String acquirerId) th
@Override
public Boolean isLockAcquired(long primaryTerm, long generation) throws IOException {
String metadataFile = getMetadataFileForCommit(primaryTerm, generation);
return isLockAcquired(metadataFile);
return isLockAcquired(metadataFile, false);
}

// Visible for testing
Boolean isLockAcquired(String metadataFile) throws IOException {
if (lockCache.get(metadataFile) != null) {
Boolean isLockAcquired(String metadataFile, boolean useCache) throws IOException {
if (useCache && acquiredLockCache.get(metadataFile) != null) {
return true;
}
boolean lockAcquired = mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build());
if (lockAcquired) {
// We do not want to cache info of unlocked metadata files as we do not want to miss a lock getting acquired
// on a metadata file between 2 invocations of this method. Caching unlocked files can lead to data
// consistency issues.
lockCache.put(metadataFile, true);
acquiredLockCache.put(metadataFile, true);
}
return lockAcquired;
}
Expand Down Expand Up @@ -741,7 +742,7 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
* @param lastNMetadataFilesToKeep number of metadata files to keep
* @throws IOException in case of I/O error while reading from / writing to remote segment store
*/
public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
private void deleteStaleSegments(int lastNMetadataFilesToKeep, boolean useCache) throws IOException {
if (lastNMetadataFilesToKeep == -1) {
logger.info(
"Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1"
Expand All @@ -767,7 +768,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
);
List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream().filter(metadataFile -> {
try {
return !isLockAcquired(metadataFile);
return !isLockAcquired(metadataFile, useCache);
} catch (IOException e) {
logger.error(
"skipping metadata file ("
Expand Down Expand Up @@ -830,22 +831,22 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}));
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, boolean useCache) {
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}), useCache);
}

/**
* Delete stale segment and metadata files asynchronously.
* This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner.
* This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int, boolean)} in an async manner.
*
* @param lastNMetadataFilesToKeep number of metadata files to keep
*/
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener) {
private void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener, boolean useCache) {
if (canDeleteStaleCommits.compareAndSet(true, false)) {
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
deleteStaleSegments(lastNMetadataFilesToKeep);
deleteStaleSegments(lastNMetadataFilesToKeep, useCache);
listener.onResponse(null);
} catch (Exception e) {
logger.error(
Expand Down Expand Up @@ -893,6 +894,10 @@ private boolean deleteIfEmpty() throws IOException {

@Override
public void close() throws IOException {
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
deleteStaleSegmentsAsync(
0,
ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")),
false
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,30 +507,41 @@ public boolean matches(FileLockInfo right) {
public void testIsLockAcquiredCachingCacheHit() throws IOException {
populateMetadata();
remoteSegmentStoreDirectory.init();
long testPrimaryTerm = 1;
long testGeneration = 5;

List<String> metadataFiles = List.of("metadata__1__5__abc");
FileLockInfo fileLockInfoLock = FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFiles.get(0)).build();
when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration),
1
)
).thenReturn(metadataFiles);

// Lock manager isAcquired returns true for the first time and false for subsequent calls
when(mdLockManager.isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock)))).thenReturn(true).thenReturn(false);

// Due to cached entries which return with isAcquired = true, subsequent calls return from cache
assertTrue(remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration));
assertTrue(remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration));
assertTrue(remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration));
assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", true));
assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", true));
assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", true));

// As all but first calls are fetching data from cache, we make only one call to mdLockManager
verify(mdLockManager, times(1)).isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock)));
}

public void testIsLockAcquiredCachingCacheHitUseCacheFalse() throws IOException {
populateMetadata();
remoteSegmentStoreDirectory.init();

List<String> metadataFiles = List.of("metadata__1__5__abc");
FileLockInfo fileLockInfoLock = FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFiles.get(0)).build();

// Lock manager isAcquired returns true for the first time and false for subsequent calls
when(mdLockManager.isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock)))).thenReturn(true).thenReturn(false);

// Due to cached entries which return with isAcquired = true, subsequent calls return from cache
assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", false));
assertFalse(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", false));
assertFalse(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", false));

// As all but first calls are fetching data from cache, we make only one call to mdLockManager
verify(mdLockManager, times(3)).isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock)));
}

public void testIsLockAcquiredCachingCacheMiss() throws IOException {
populateMetadata();
remoteSegmentStoreDirectory.init();
Expand Down Expand Up @@ -990,7 +1001,7 @@ public void testDeleteStaleCommitsException() throws Exception {
// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not
// invoked
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true);

assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class));
Expand All @@ -1003,7 +1014,7 @@ public void testDeleteStaleCommitsExceptionWhileScheduling() throws Exception {
// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not
// invoked
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true);

assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class));
Expand All @@ -1016,7 +1027,7 @@ public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws Excepti
// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not
// invoked
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true);

assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(false)));
verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class));
Expand All @@ -1027,7 +1038,7 @@ public void testDeleteStaleCommitsWithinThreshold() throws Exception {

// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5);
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5, true);

assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT));
Expand All @@ -1039,7 +1050,7 @@ public void testDeleteStaleCommitsActualDelete() throws Exception {

// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true);

for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
Expand All @@ -1063,7 +1074,7 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws Exception {
doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(segmentFileWithException);
// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true);

for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
Expand All @@ -1086,7 +1097,7 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Excep
doThrow(new NoSuchFileException(segmentFileWithException)).when(remoteDataDirectory).deleteFile(segmentFileWithException);
// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true);

for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
Expand Down

0 comments on commit e6f5583

Please sign in to comment.