From ca5d929346c8bd5eee88efd603fd05a7f538c83d Mon Sep 17 00:00:00 2001 From: Harish Bhakuni Date: Wed, 14 Feb 2024 09:22:21 -0800 Subject: [PATCH] Initial changes for remote store cleanup during snapshot optimizations. Signed-off-by: Harish Bhakuni --- .../store/RemoteSegmentStoreDirectory.java | 43 ++-- .../lockmanager/RemoteStoreLockManager.java | 3 + .../RemoteStoreMetadataLockManager.java | 14 ++ .../blobstore/BlobStoreRepository.java | 235 ++++++++++++------ 4 files changed, 201 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index bfab9f8c18aa2..a598802240697 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -30,6 +30,7 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.store.lockmanager.FileLockInfo; @@ -849,33 +850,43 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene } /* - Tries to delete shard level directory if it is empty - Return true if it deleted it successfully + Tries to delete shard level directory if it does not have any locks. + Return true if shard is enqueued successfully for async cleanup. */ - private boolean deleteIfEmpty() throws IOException { - Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( - MetadataFilenameUtils.METADATA_PREFIX, - 1 - ); - if (metadataFiles.size() != 0) { - logger.info("Remote directory still has files, not deleting the path"); + public boolean deleteIfEmpty() { + Set allLockedFiles; + try { + allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles( + MetadataFilenameUtils.METADATA_PREFIX + ); + } catch (Exception e) { + logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e); + return false; + } + if (allLockedFiles.size() != 0) { + logger.info("Remote directory still has locked files, not deleting the path"); return false; } try { - remoteDataDirectory.delete(); - remoteMetadataDirectory.delete(); - mdLockManager.delete(); - } catch (Exception e) { - logger.error("Exception occurred while deleting directory", e); + threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + try { + remoteDataDirectory.delete(); + remoteMetadataDirectory.delete(); + mdLockManager.delete(); + } catch (Exception e) { + logger.error("Exception occurred while deleting directory, it will get retried during next call", e); + } + }); + } catch (OpenSearchRejectedExecutionException e) { + logger.error("Exception occurred while enqueueing directory for cleanup", e); return false; } - return true; } @Override public void close() throws IOException { - deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory"))); + deleteIfEmpty(); } } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java index 4fa23dfe9dc3a..205c7a3cfd1c6 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.PublicApi; import java.io.IOException; +import java.util.List; /** * An Interface that defines Remote Store Lock Manager. @@ -43,6 +44,8 @@ public interface RemoteStoreLockManager { */ Boolean isAcquired(LockInfo lockInfo) throws IOException; + List listLocks() throws IOException; + /** * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo. * There can occur a race condition where the original file is deleted before we can use it to acquire lock for the new acquirer. Until we have a diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 9c29e03c225e4..75bc343c246b3 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -14,6 +14,7 @@ import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.PublicApi; import org.opensearch.index.store.RemoteBufferedOutputDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import java.io.FileNotFoundException; import java.io.IOException; @@ -107,6 +108,19 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { return !lockFiles.isEmpty(); } + @Override + public List listLocks() throws IOException { + Collection lockFiles = lockDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX); + return lockFiles.stream() + .map( + lock -> FileLockInfo.getLockInfoBuilder() + .withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock)) + .withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock)) + .build() + ) + .collect(Collectors.toList()); + } + /** * Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo. * Snapshot layer enforces thread safety by having checks in place to ensure that the source snapshot is not being deleted before proceeding diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 8a2260e1f6d90..915c5f32f52a9 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -117,6 +117,7 @@ import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.snapshots.blobstore.SlicedInputStream; import org.opensearch.index.snapshots.blobstore.SnapshotFiles; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -152,6 +153,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -217,6 +219,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String UPLOADED_DATA_BLOB_PREFIX = "__"; + /** + * Cache for remote store lock and cleanup which tracks shard level snapshot lock release and remote store cleanup. + */ + protected volatile RemoteStoreShardCleanupTracker remoteStoreShardCleanupTracker; + /** * Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are * already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and @@ -422,6 +429,7 @@ protected BlobStoreRepository( this.threadPool = clusterService.getClusterApplierService().threadPool(); this.clusterService = clusterService; this.recoverySettings = recoverySettings; + this.remoteStoreShardCleanupTracker = new RemoteStoreShardCleanupTracker(); } @Override @@ -1098,6 +1106,112 @@ private void asyncCleanupUnlinkedShardLevelBlobs( } } + private boolean releaseRemoteStoreLockAndCleanup( + String shardId, + String shallowSnapshotUUID, + BlobContainer shardContainer, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory + ) { + if (remoteStoreLockManagerFactory == null) { + return true; + } + + try { + RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( + shardContainer, + shallowSnapshotUUID, + namedXContentRegistry + ); + String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); + String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); + // Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while + // releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during + // next delete operation for releasing this lock file + RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepoForIndex, + indexUUID, + shardId + ); + if (!remoteStoreShardCleanupTracker.isShardLockReleased(indexUUID, shardId, shallowSnapshotUUID)) { + remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build()); + remoteStoreShardCleanupTracker.addReleasedShardLock(indexUUID, shardId, shallowSnapshotUUID); + } + logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID); + if (!isIndexPresent(clusterService, indexUUID) + && !remoteStoreShardCleanupTracker.isShardEnqueued(indexUUID, shardId) + && remoteStoreMetadataLockManager.listLocks().size() == 0) { + // Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if + // index is already deleted. We will add a poller in future to take care of remote store side cleanup. + // related issue: https://github.com/opensearch-project/OpenSearch/issues/8469 + try ( + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) new RemoteSegmentStoreDirectoryFactory( + remoteStoreLockManagerFactory.getRepositoriesService(), + threadPool + ).newDirectory( + remoteStoreRepoForIndex, + indexUUID, + new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId)) + ) + ) { + // Note: shard cleanup will still happen asynchronously using REMOTE_PURGE threadpool. if it fails, + // it could leave some stale files in remote directory. this issue could even happen in cases of + // shard level remote store data cleanup which also happens asynchronously. in long term, we have + // plans to implement remote store GC poller mechanism which will take care of such stale data. + // related issue: https://github.com/opensearch-project/OpenSearch/issues/8469 + if (remoteSegmentStoreDirectory.deleteIfEmpty()) { + remoteStoreShardCleanupTracker.addEnqueuedShardCleanup(indexUUID, shardId); + return true; + } + return false; + } + } + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "Failed to release lock or trigger async cleanup for remote directory, skipping blob deletion for [{}]", + shardId + ), + e + ); + } + return false; + } + + private static class RemoteStoreShardCleanupTracker { + private final Set enqueuedRemoteStoreCleanup; + private final Set releasedLocks; + + private RemoteStoreShardCleanupTracker() { + enqueuedRemoteStoreCleanup = new HashSet<>(); + releasedLocks = new HashSet<>(); + } + + private String indexShardIdentifier(String indexUUID, String shardId) { + return String.join("/", indexUUID, shardId); + } + + private String shardLockIdentifier(String indexUUID, String shardId, String snapshotUUID) { + return String.join("/", indexUUID, shardId, snapshotUUID); + } + + public boolean isShardLockReleased(String indexUUID, String shardId, String snapshotUUID) { + return releasedLocks.contains(shardLockIdentifier(indexUUID, shardId, snapshotUUID)); + } + + public void addReleasedShardLock(String indexUUID, String shardId, String snapshotUUID) { + releasedLocks.add(shardLockIdentifier(indexUUID, shardId, snapshotUUID)); + } + + public boolean isShardEnqueued(String indexUUID, String shardId) { + return enqueuedRemoteStoreCleanup.contains(indexShardIdentifier(indexUUID, shardId)); + } + + public void addEnqueuedShardCleanup(String indexUUID, String shardId) { + enqueuedRemoteStoreCleanup.add(indexShardIdentifier(indexUUID, shardId)); + } + } + // When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective // shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs. private void executeStaleShardDelete( @@ -1109,56 +1223,27 @@ private void executeStaleShardDelete( if (filesToDelete != null) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { try { - if (remoteStoreLockManagerFactory != null) { - for (String fileToDelete : filesToDelete) { - if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { - String[] fileToDeletePath = fileToDelete.split("/"); - String indexId = fileToDeletePath[1]; - String shardId = fileToDeletePath[2]; - String shallowSnapBlob = fileToDeletePath[3]; - String snapshotUUID = shallowSnapBlob.substring( - SHALLOW_SNAPSHOT_PREFIX.length(), - shallowSnapBlob.length() - ".dat".length() - ); - BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); - RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( - shardContainer, - snapshotUUID, - namedXContentRegistry - ); - String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); - String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); - // Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while - // releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during - // next delete operation for releasing this lock file - RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( - remoteStoreRepoForIndex, - indexUUID, - shardId - ); - remoteStoreMetadataLockManager.release( - FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() - ); - if (!isIndexPresent(clusterService, indexUUID)) { - // this is a temporary solution where snapshot deletion triggers remote store side - // cleanup if index is already deleted. We will add a poller in future to take - // care of remote store side cleanup. - // see https://github.com/opensearch-project/OpenSearch/issues/8469 - new RemoteSegmentStoreDirectoryFactory( - remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool - ).newDirectory( - remoteStoreRepoForIndex, - indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId)) - ).close(); - } + // filtering files for which remote store lock release and cleanup succeeded, + // remaining files for which it failed will be retried in next snapshot delete run. + List modifiedFilesToDelete = new ArrayList<>(); + for (String fileToDelete : filesToDelete) { + if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { + String[] fileToDeletePath = fileToDelete.split("/"); + String indexId = fileToDeletePath[1]; + String shardId = fileToDeletePath[2]; + String shallowSnapBlob = fileToDeletePath[3]; + String snapshotUUID = shallowSnapBlob.substring( + SHALLOW_SNAPSHOT_PREFIX.length(), + shallowSnapBlob.length() - ".dat".length() + ); + BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); + if (releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory)) { + modifiedFilesToDelete.add(fileToDelete); } } } // Deleting the shard blobs - deleteFromContainer(blobContainer(), filesToDelete); + deleteFromContainer(blobContainer(), modifiedFilesToDelete); l.onResponse(null); } catch (Exception e) { logger.warn( @@ -1585,6 +1670,7 @@ private void executeOneStaleIndexDelete( DeleteResult deleteResult = DeleteResult.ZERO; try { logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + boolean releaseLockOrTriggerCleanupFailed = false; if (remoteStoreLockManagerFactory != null) { Map shardBlobs = indexEntry.getValue().children(); if (!shardBlobs.isEmpty()) { @@ -1592,46 +1678,39 @@ private void executeOneStaleIndexDelete( Map shardLevelBlobs = shardBlob.getValue().listBlobs(); for (Map.Entry shardLevelBlob : shardLevelBlobs.entrySet()) { String blob = shardLevelBlob.getKey(); - String snapshotUUID = blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()); if (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) && blob.endsWith(".dat")) { - RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( - shardBlob.getValue(), - snapshotUUID, - namedXContentRegistry - ); - String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); - String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); - // Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure - // while releasing the lock file, we would still have the corresponding shallow-snap-UUID file - // and that would be used during next delete operation for releasing this stale lock file - RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory - .newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey()); - remoteStoreMetadataLockManager.release( - FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() + String snapshotUUID = blob.substring( + SHALLOW_SNAPSHOT_PREFIX.length(), + blob.length() - ".dat".length() ); - if (!isIndexPresent(clusterService, indexUUID)) { - // this is a temporary solution where snapshot deletion triggers remote store side - // cleanup if index is already deleted. We will add a poller in future to take - // care of remote store side cleanup. - // see https://github.com/opensearch-project/OpenSearch/issues/8469 - new RemoteSegmentStoreDirectoryFactory( - remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool - ).newDirectory( - remoteStoreRepoForIndex, - indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardBlob.getKey())) - ).close(); + if (!releaseRemoteStoreLockAndCleanup( + shardBlob.getKey(), + snapshotUUID, + shardBlob.getValue(), + remoteStoreLockManagerFactory + )) { + // release lock or async cleanup trigger did not succeed. + releaseLockOrTriggerCleanupFailed = false; } } } } } } - // Deleting the index folder - deleteResult = indexEntry.getValue().delete(); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + if (!releaseLockOrTriggerCleanupFailed) { + // Deleting the index folder + deleteResult = indexEntry.getValue().delete(); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + } else { + logger.warn( + "[{}] index {} is no longer part of any snapshots in the repository, " + + "but skipping clean up of their index folders since either release lock or remote store " + + "cleanup failed for at least one of the index shard.", + metadata.name(), + indexSnId + ); + } + } catch (IOException e) { logger.warn( () -> new ParameterizedMessage(