Skip to content

Commit

Permalink
Initial changes for remote store cleanup during snapshot optimizations.
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Feb 16, 2024
1 parent b19e427 commit ca5d929
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.lockmanager.FileLockInfo;
Expand Down Expand Up @@ -849,33 +850,43 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
}

/*
Tries to delete shard level directory if it is empty
Return true if it deleted it successfully
Tries to delete shard level directory if it does not have any locks.
Return true if shard is enqueued successfully for async cleanup.
*/
private boolean deleteIfEmpty() throws IOException {
Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
1
);
if (metadataFiles.size() != 0) {
logger.info("Remote directory still has files, not deleting the path");
public boolean deleteIfEmpty() {
Set<String> allLockedFiles;
try {
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(
MetadataFilenameUtils.METADATA_PREFIX
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return false;
}
if (allLockedFiles.size() != 0) {
logger.info("Remote directory still has locked files, not deleting the path");
return false;
}

try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory", e);
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory, it will get retried during next call", e);
}
});
} catch (OpenSearchRejectedExecutionException e) {
logger.error("Exception occurred while enqueueing directory for cleanup", e);
return false;
}

return true;
}

@Override
public void close() throws IOException {
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
deleteIfEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.PublicApi;

import java.io.IOException;
import java.util.List;

/**
* An Interface that defines Remote Store Lock Manager.
Expand Down Expand Up @@ -43,6 +44,8 @@ public interface RemoteStoreLockManager {
*/
Boolean isAcquired(LockInfo lockInfo) throws IOException;

List<LockInfo> listLocks() throws IOException;

/**
* Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo.
* There can occur a race condition where the original file is deleted before we can use it to acquire lock for the new acquirer. Until we have a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -107,6 +108,19 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException {
return !lockFiles.isEmpty();
}

@Override
public List<LockInfo> listLocks() throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX);
return lockFiles.stream()
.map(
lock -> FileLockInfo.getLockInfoBuilder()
.withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock))
.withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock))
.build()
)
.collect(Collectors.toList());
}

/**
* Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo.
* Snapshot layer enforces thread safety by having checks in place to ensure that the source snapshot is not being deleted before proceeding
Expand Down
Loading

0 comments on commit ca5d929

Please sign in to comment.