Skip to content

Commit

Permalink
Optimize delete stale segments and create remote directory instance w…
Browse files Browse the repository at this point in the history
…ithin cleanup execution thread.

Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Feb 23, 2024
1 parent 1bf807d commit 81515fe
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public Collection<String> listFilesByPrefix(String filenamePrefix) throws IOExce
return blobContainer.listBlobsByPrefix(filenamePrefix).keySet();
}

public void deleteFiles(List<String> files) throws IOException {
blobContainer.deleteBlobsIgnoringIfNotExists(files);
}

public List<String> listFilesByPrefixInLexicographicOrder(String filenamePrefix, int limit) throws IOException {
List<String> sortedBlobList = new ArrayList<>();
AtomicReference<Exception> exception = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.lockmanager.FileLockInfo;
Expand Down Expand Up @@ -116,8 +115,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
*/
protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true);

protected final AtomicBoolean isDirectoryCleanupOngoing = new AtomicBoolean(false);

private final AtomicLong metadataUploadCounter = new AtomicLong(0);

public static final int METADATA_FILES_TO_FETCH = 10;
Expand Down Expand Up @@ -721,6 +718,74 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
}

private Map<String, String> populatePreviousMetadata(
List<String> sortedMetadataFiles,
Set<String> lockedMdFiles,
int lastMetadataFilesToKeep
) {
Map<String, String> activePreviousMetadata = new HashMap<>();
if (sortedMetadataFiles.size() == 0) {
return activePreviousMetadata;
}
String currMetadata = sortedMetadataFiles.get(lastMetadataFilesToKeep);
String prevMetadata = null;
if (lastMetadataFilesToKeep > 0) {
prevMetadata = sortedMetadataFiles.get(lastMetadataFilesToKeep - 1);
}
activePreviousMetadata.put(currMetadata, prevMetadata);
prevMetadata = currMetadata;
for (int i = lastMetadataFilesToKeep + 1; i < sortedMetadataFiles.size(); i++) {
currMetadata = sortedMetadataFiles.get(i);
if (lockedMdFiles.contains(prevMetadata)) {
activePreviousMetadata.put(currMetadata, prevMetadata);
} else {
activePreviousMetadata.put(currMetadata, activePreviousMetadata.get(prevMetadata));
}
prevMetadata = currMetadata;
}
return activePreviousMetadata;
}

private Map<String, String> populateNextMetadata(
List<String> sortedMetadataFiles,
Set<String> lockedMdFiles,
int lastMetadataFilesToKeep
) {
Map<String, String> activeNextMetadata = new HashMap<>();
if (sortedMetadataFiles.size() == 0) {
return activeNextMetadata;
}
activeNextMetadata.put(sortedMetadataFiles.get(sortedMetadataFiles.size() - 1), null);
String nextMetadata = sortedMetadataFiles.get(sortedMetadataFiles.size() - 1);
for (int i = sortedMetadataFiles.size() - 2; i >= lastMetadataFilesToKeep; i--) {
String currMetadata = sortedMetadataFiles.get(i);
if (lockedMdFiles.contains(nextMetadata)) {
activeNextMetadata.put(currMetadata, nextMetadata);
} else {
activeNextMetadata.put(currMetadata, activeNextMetadata.get(nextMetadata));
}
nextMetadata = currMetadata;
}
return activeNextMetadata;
}

private void processActiveSegments(
String metadataFile,
Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap,
Set<String> activeSegmentRemoteFilenames,
Map<String, Boolean> metadataAlreadyProcessed
) throws IOException {
if (metadataFile == null || metadataAlreadyProcessed.containsKey(metadataFile)) {
return;
}
Map<String, UploadedSegmentMetadata> segmentMetadataMap = readMetadataFile(metadataFile).getMetadata();
activeSegmentFilesMetadataMap.putAll(segmentMetadataMap);
activeSegmentRemoteFilenames.addAll(
segmentMetadataMap.values().stream().map(metadata -> metadata.uploadedFilename).collect(Collectors.toSet())
);
metadataAlreadyProcessed.put(metadataFile, true);
}

/**
* Delete stale segment and metadata files
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
Expand Down Expand Up @@ -763,7 +828,6 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());

sortedMetadataFileList.removeAll(metadataFilesToBeDeleted);
logger.debug(
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
metadataFilesEligibleToDelete,
Expand All @@ -772,48 +836,60 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException

Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
Set<String> activeSegmentRemoteFilenames = new HashSet<>();
for (String metadataFile : sortedMetadataFileList) {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = readMetadataFile(metadataFile).getMetadata();
activeSegmentFilesMetadataMap.putAll(segmentMetadataMap);
activeSegmentRemoteFilenames.addAll(
segmentMetadataMap.values().stream().map(metadata -> metadata.uploadedFilename).collect(Collectors.toSet())
);
}
Map<String, Boolean> metadataAlreadyProcessed = new HashMap<>();

Map<String, String> activePreviousMetadata = populatePreviousMetadata(
sortedMetadataFileList,
allLockFiles,
lastNMetadataFilesToKeep
);
Map<String, String> activeNextMetadata = populateNextMetadata(sortedMetadataFileList, allLockFiles, lastNMetadataFilesToKeep);
Set<String> deletedSegmentFiles = new HashSet<>();
List<String> metadataFilesWithSuccessfulSegmentDeletion = new ArrayList<>();
for (String metadataFile : metadataFilesToBeDeleted) {
Map<String, UploadedSegmentMetadata> staleSegmentFilesMetadataMap = readMetadataFile(metadataFile).getMetadata();
Set<String> staleSegmentRemoteFilenames = staleSegmentFilesMetadataMap.values()
.stream()
.map(metadata -> metadata.uploadedFilename)
.collect(Collectors.toSet());

processActiveSegments(
activePreviousMetadata.get(metadataFile),
activeSegmentFilesMetadataMap,
activeSegmentRemoteFilenames,
metadataAlreadyProcessed
);
processActiveSegments(
activeNextMetadata.get(metadataFile),
activeSegmentFilesMetadataMap,
activeSegmentRemoteFilenames,
metadataAlreadyProcessed
);
AtomicBoolean deletionSuccessful = new AtomicBoolean(true);

List<String> staleSegmentFilesToBeDeleted = new ArrayList<>();
staleSegmentRemoteFilenames.stream()
.filter(file -> activeSegmentRemoteFilenames.contains(file) == false)
.filter(file -> deletedSegmentFiles.contains(file) == false)
.forEach(file -> {
try {
remoteDataDirectory.deleteFile(file);
deletedSegmentFiles.add(file);
if (!activeSegmentFilesMetadataMap.containsKey(getLocalSegmentFilename(file))) {
segmentsUploadedToRemoteStore.remove(getLocalSegmentFilename(file));
}
} catch (NoSuchFileException e) {
logger.info("Segment file {} corresponding to metadata file {} does not exist in remote", file, metadataFile);
} catch (IOException e) {
deletionSuccessful.set(false);
logger.warn(
"Exception while deleting segment file {} corresponding to metadata file {}. Deletion will be re-tried",
file,
metadataFile
);
staleSegmentFilesToBeDeleted.add(file);
deletedSegmentFiles.add(file);
if (!activeSegmentFilesMetadataMap.containsKey(getLocalSegmentFilename(file))) {
segmentsUploadedToRemoteStore.remove(getLocalSegmentFilename(file));
}
});
if (deletionSuccessful.get()) {
logger.debug("Deleting stale metadata file {} from remote segment store", metadataFile);
remoteMetadataDirectory.deleteFile(metadataFile);
try {
if (!staleSegmentFilesToBeDeleted.isEmpty()) {
remoteDataDirectory.deleteFiles(staleSegmentFilesToBeDeleted);
}
metadataFilesWithSuccessfulSegmentDeletion.add(metadataFile);
deletionSuccessful.set(true);
} catch (IOException e) {
logger.debug("failed to delete the stale segments, will skip deletion of corresponding md file");
}
}
logger.debug("deletedSegmentFiles={}", deletedSegmentFiles);
remoteMetadataDirectory.deleteFiles(metadataFilesWithSuccessfulSegmentDeletion);
logger.debug("deletedSegmentFiles={}, deletedMetadataFiles={}", deletedSegmentFiles, metadataFilesWithSuccessfulSegmentDeletion);
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
Expand Down Expand Up @@ -851,50 +927,66 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
}
}

public static boolean cleanupAsync(
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory,
ThreadPool threadpool,
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId
) {
try {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
try (
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
remoteStoreRepoForIndex,
indexUUID,
shardId
)
) {
remoteDirectory.deleteStaleSegments(0);
remoteDirectory.deleteIfEmpty();
}
} catch (IOException e) {}
});
} catch (Exception e) {
return false;
}
return true;
}

/*
Tries to delete shard level directory if it does not have any locks.
Return true if shard is enqueued successfully for async cleanup.
Tries to delete shard level directory if it is empty
Return true if it deleted it successfully
*/
public boolean deleteIfEmpty() {
if (isDirectoryCleanupOngoing.compareAndSet(false, true)) {
Set<String> allLockedFiles;
try {
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(
MetadataFilenameUtils.METADATA_PREFIX
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
isDirectoryCleanupOngoing.set(false);
return false;
}
if (allLockedFiles.size() != 0) {
logger.info("Remote directory still has locked files, not deleting the path");
isDirectoryCleanupOngoing.set(false);
return false;
}
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory, it will get retried during next call", e);
} finally {
isDirectoryCleanupOngoing.set(false);
}
});
} catch (OpenSearchRejectedExecutionException e) {
logger.error("Exception occurred while enqueueing directory for cleanup", e);
isDirectoryCleanupOngoing.set(false);
return false;
}
private boolean deleteIfEmpty() throws IOException {
Set<String> allLockedFiles;
try {
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(
MetadataFilenameUtils.METADATA_PREFIX
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping path deletion", e);
return false;
}
if (allLockedFiles.size() != 0) {
logger.info("Remote directory still has locked files, not deleting the path");
return false;
}

try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory", e);
return false;
}
return true;
}

@Override
public void close() throws IOException {
deleteIfEmpty();
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface RemoteStoreLockManager {
/**
*
* @return true if lock directory is empty.
* @throws IOException
* @throws IOException throws exception if list locks call fails.
*/
Boolean isEmpty() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,33 +1138,26 @@ protected boolean releaseRemoteStoreLockAndCleanup(
remoteStoreShardCleanupTracker.addReleasedShardLock(indexUUID, shardId, shallowSnapshotUUID);
}
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
if (!isIndexPresent(clusterService, indexUUID)
&& !remoteStoreShardCleanupTracker.isShardEnqueued(indexUUID, shardId)
&& remoteStoreMetadataLockManager.isEmpty()) {
if (!isIndexPresent(clusterService, indexUUID) && !remoteStoreShardCleanupTracker.isShardEnqueued(indexUUID, shardId)) {
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
// index is already deleted. We will add a poller in future to take care of remote store side cleanup.
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
try (
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
)
) {
// Note: shard cleanup will still happen asynchronously using REMOTE_PURGE threadpool. if it fails,
// it could leave some stale files in remote directory. this issue could even happen in cases of
// shard level remote store data cleanup which also happens asynchronously. in long term, we have
// plans to implement remote store GC poller mechanism which will take care of such stale data.
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
if (remoteSegmentStoreDirectory.deleteIfEmpty()) {
remoteStoreShardCleanupTracker.addEnqueuedShardCleanup(indexUUID, shardId);
return true;
}
return false;
// index is already deleted. shard cleanup will still happen asynchronously using REMOTE_PURGE
// threadpool. if it fails, it could leave some stale files in remote directory. this issue could
// even happen in cases of shard level remote store data cleanup which also happens asynchronously.
// in long term, we have plans to implement remote store GC poller mechanism which will take care of
// such stale data. related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
RemoteSegmentStoreDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
);
if (RemoteSegmentStoreDirectory.cleanupAsync(
remoteDirectoryFactory,
threadPool,
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId))
)) {
remoteStoreShardCleanupTracker.addEnqueuedShardCleanup(indexUUID, shardId);
return true;
}
}
} catch (Exception e) {
Expand All @@ -1184,13 +1177,9 @@ private static class RemoteStoreShardCleanupTracker {
private final Cache<String, Boolean> releasedLocks;
// lock files will be per shard per snapshot
private static final int DEFAULT_RELEASED_LOCK_CACHE_SIZE = 10000;
// cleanup will be per shard
private static final int DEFAULT_ENQUEUED_CLEANUP_CACHE_SIZE = 1000;

private RemoteStoreShardCleanupTracker() {
enqueuedRemoteStoreCleanup = CacheBuilder.<String, Boolean>builder()
.setMaximumWeight(DEFAULT_ENQUEUED_CLEANUP_CACHE_SIZE)
.build();
enqueuedRemoteStoreCleanup = CacheBuilder.<String, Boolean>builder().setExpireAfterWrite(TimeValue.timeValueHours(1)).build();
releasedLocks = CacheBuilder.<String, Boolean>builder().setMaximumWeight(DEFAULT_RELEASED_LOCK_CACHE_SIZE).build();
}

Expand Down
Loading

0 comments on commit 81515fe

Please sign in to comment.