diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 67e02e88af73a..a598802240697 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -30,6 +30,7 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.store.lockmanager.FileLockInfo; @@ -849,13 +850,15 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene } /* - Tries to delete shard level directory if it do not have any locks. - Return true if it deleted it successfully + Tries to delete shard level directory if it does not have any locks. + Return true if shard is enqueued successfully for async cleanup. */ public boolean deleteIfEmpty() { Set allLockedFiles; try { - allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX); + allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles( + MetadataFilenameUtils.METADATA_PREFIX + ); } catch (Exception e) { logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e); return false; @@ -865,7 +868,6 @@ public boolean deleteIfEmpty() { return false; } - AtomicBoolean cleanupFailed = new AtomicBoolean(false); try { threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { try { @@ -873,18 +875,14 @@ public boolean deleteIfEmpty() { remoteMetadataDirectory.delete(); mdLockManager.delete(); } catch (Exception e) { - logger.error( - "Exception occurred while deleting directory, it will get retried during next call", - e - ); - cleanupFailed.set(true); + logger.error("Exception occurred while deleting directory, it will get retried during next call", e); } }); - } catch (Exception e) { - logger.error("Exception occurred while deleting directory", e); - cleanupFailed.set(true); + } catch (OpenSearchRejectedExecutionException e) { + logger.error("Exception occurred while enqueueing directory for cleanup", e); + return false; } - return !cleanupFailed.get(); + return true; } @Override diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 2d53dfdd02cf5..75bc343c246b3 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -111,7 +111,14 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { @Override public List listLocks() throws IOException { Collection lockFiles = lockDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX); - return lockFiles.stream().map(lock -> FileLockInfo.getLockInfoBuilder().withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock)).withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock)).build()).collect(Collectors.toList()); + return lockFiles.stream() + .map( + lock -> FileLockInfo.getLockInfoBuilder() + .withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock)) + .withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock)) + .build() + ) + .collect(Collectors.toList()); } /** diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index be8964e06b2e0..915c5f32f52a9 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -121,7 +121,9 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.index.store.lockmanager.*; +import org.opensearch.index.store.lockmanager.FileLockInfo; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.IndexId; @@ -151,6 +153,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -216,6 +219,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String UPLOADED_DATA_BLOB_PREFIX = "__"; + /** + * Cache for remote store lock and cleanup which tracks shard level snapshot lock release and remote store cleanup. + */ + protected volatile RemoteStoreShardCleanupTracker remoteStoreShardCleanupTracker; + /** * Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are * already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and @@ -421,6 +429,7 @@ protected BlobStoreRepository( this.threadPool = clusterService.getClusterApplierService().threadPool(); this.clusterService = clusterService; this.recoverySettings = recoverySettings; + this.remoteStoreShardCleanupTracker = new RemoteStoreShardCleanupTracker(); } @Override @@ -1097,44 +1106,22 @@ private void asyncCleanupUnlinkedShardLevelBlobs( } } - /** - * The result of removing a snapshot from a shard folder in the repository. - */ - private static final class ShardRemoteStoreReleaseCleanupResult { - // snapshot shard blob file - private final String fileToDelete; - - // does release/cleanup successful - private final Boolean cleanupSucceeded; - - ShardRemoteStoreReleaseCleanupResult(String fileToDelete, boolean cleanupSucceeded) { - this.fileToDelete = fileToDelete; - this.cleanupSucceeded = cleanupSucceeded; - } - } - - private boolean releaseRemoteStoreLockAndCleanup(String fileToDelete, - RemoteStoreLockManagerFactory remoteStoreLockManagerFactory) { - if (remoteStoreLockManagerFactory == null || !fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { + private boolean releaseRemoteStoreLockAndCleanup( + String shardId, + String shallowSnapshotUUID, + BlobContainer shardContainer, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory + ) { + if (remoteStoreLockManagerFactory == null) { return true; } - String[] fileToDeletePath = fileToDelete.split("/"); - String indexId = fileToDeletePath[1]; - String shardId = fileToDeletePath[2]; - String shallowSnapBlob = fileToDeletePath[3]; - String snapshotUUID = shallowSnapBlob.substring( - SHALLOW_SNAPSHOT_PREFIX.length(), - shallowSnapBlob.length() - ".dat".length() - ); try { - BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); - RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = - REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( - shardContainer, - snapshotUUID, - namedXContentRegistry - ); + RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read( + shardContainer, + shallowSnapshotUUID, + namedXContentRegistry + ); String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID(); String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(); // Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while @@ -1145,36 +1132,86 @@ private boolean releaseRemoteStoreLockAndCleanup(String fileToDelete, indexUUID, shardId ); - remoteStoreMetadataLockManager.release( - FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() - ); + if (!remoteStoreShardCleanupTracker.isShardLockReleased(indexUUID, shardId, shallowSnapshotUUID)) { + remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build()); + remoteStoreShardCleanupTracker.addReleasedShardLock(indexUUID, shardId, shallowSnapshotUUID); + } logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID); - if (!isIndexPresent(clusterService, indexUUID) && remoteStoreMetadataLockManager.listLocks().size() == 0) { - // this is a temporary solution where snapshot deletion triggers remote store side - // cleanup if index is already deleted. We will add a poller in future to take - // care of remote store side cleanup. - // see https://github.com/opensearch-project/OpenSearch/issues/8469 - try (RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) new RemoteSegmentStoreDirectoryFactory( - remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool - ).newDirectory( - remoteStoreRepoForIndex, - indexUUID, - new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId)) - )) { - // TODO: if cleanup fails we we will still proceed with corresponding shard md cleanup. - // but this is a known scenario which is needed to be taken care separately. + if (!isIndexPresent(clusterService, indexUUID) + && !remoteStoreShardCleanupTracker.isShardEnqueued(indexUUID, shardId) + && remoteStoreMetadataLockManager.listLocks().size() == 0) { + // Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if + // index is already deleted. We will add a poller in future to take care of remote store side cleanup. + // related issue: https://github.com/opensearch-project/OpenSearch/issues/8469 + try ( + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) new RemoteSegmentStoreDirectoryFactory( + remoteStoreLockManagerFactory.getRepositoriesService(), + threadPool + ).newDirectory( + remoteStoreRepoForIndex, + indexUUID, + new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId)) + ) + ) { + // Note: shard cleanup will still happen asynchronously using REMOTE_PURGE threadpool. if it fails, + // it could leave some stale files in remote directory. this issue could even happen in cases of + // shard level remote store data cleanup which also happens asynchronously. in long term, we have + // plans to implement remote store GC poller mechanism which will take care of such stale data. // related issue: https://github.com/opensearch-project/OpenSearch/issues/8469 - return remoteSegmentStoreDirectory.deleteIfEmpty(); + if (remoteSegmentStoreDirectory.deleteIfEmpty()) { + remoteStoreShardCleanupTracker.addEnqueuedShardCleanup(indexUUID, shardId); + return true; + } + return false; } - } } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("Failed to release lock or cleanup remote directory, skipping blob deletion for [{}]", shardId), e); + logger.warn( + () -> new ParameterizedMessage( + "Failed to release lock or trigger async cleanup for remote directory, skipping blob deletion for [{}]", + shardId + ), + e + ); } return false; } + private static class RemoteStoreShardCleanupTracker { + private final Set enqueuedRemoteStoreCleanup; + private final Set releasedLocks; + + private RemoteStoreShardCleanupTracker() { + enqueuedRemoteStoreCleanup = new HashSet<>(); + releasedLocks = new HashSet<>(); + } + + private String indexShardIdentifier(String indexUUID, String shardId) { + return String.join("/", indexUUID, shardId); + } + + private String shardLockIdentifier(String indexUUID, String shardId, String snapshotUUID) { + return String.join("/", indexUUID, shardId, snapshotUUID); + } + + public boolean isShardLockReleased(String indexUUID, String shardId, String snapshotUUID) { + return releasedLocks.contains(shardLockIdentifier(indexUUID, shardId, snapshotUUID)); + } + + public void addReleasedShardLock(String indexUUID, String shardId, String snapshotUUID) { + releasedLocks.add(shardLockIdentifier(indexUUID, shardId, snapshotUUID)); + } + + public boolean isShardEnqueued(String indexUUID, String shardId) { + return enqueuedRemoteStoreCleanup.contains(indexShardIdentifier(indexUUID, shardId)); + } + + public void addEnqueuedShardCleanup(String indexUUID, String shardId) { + enqueuedRemoteStoreCleanup.add(indexShardIdentifier(indexUUID, shardId)); + } + } + // When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective // shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs. private void executeStaleShardDelete( @@ -1188,7 +1225,23 @@ private void executeStaleShardDelete( try { // filtering files for which remote store lock release and cleanup succeeded, // remaining files for which it failed will be retried in next snapshot delete run. - List modifiedFilesToDelete = filesToDelete.stream().filter(fileToDelete -> releaseRemoteStoreLockAndCleanup(fileToDelete, remoteStoreLockManagerFactory)).collect(Collectors.toList()); + List modifiedFilesToDelete = new ArrayList<>(); + for (String fileToDelete : filesToDelete) { + if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { + String[] fileToDeletePath = fileToDelete.split("/"); + String indexId = fileToDeletePath[1]; + String shardId = fileToDeletePath[2]; + String shallowSnapBlob = fileToDeletePath[3]; + String snapshotUUID = shallowSnapBlob.substring( + SHALLOW_SNAPSHOT_PREFIX.length(), + shallowSnapBlob.length() - ".dat".length() + ); + BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); + if (releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory)) { + modifiedFilesToDelete.add(fileToDelete); + } + } + } // Deleting the shard blobs deleteFromContainer(blobContainer(), modifiedFilesToDelete); l.onResponse(null); @@ -1617,6 +1670,7 @@ private void executeOneStaleIndexDelete( DeleteResult deleteResult = DeleteResult.ZERO; try { logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + boolean releaseLockOrTriggerCleanupFailed = false; if (remoteStoreLockManagerFactory != null) { Map shardBlobs = indexEntry.getValue().children(); if (!shardBlobs.isEmpty()) { @@ -1624,20 +1678,39 @@ private void executeOneStaleIndexDelete( Map shardLevelBlobs = shardBlob.getValue().listBlobs(); for (Map.Entry shardLevelBlob : shardLevelBlobs.entrySet()) { String blob = shardLevelBlob.getKey(); - String snapshotUUID = blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()); if (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) && blob.endsWith(".dat")) { - if (!releaseRemoteStoreLockAndCleanup(blob, remoteStoreLockManagerFactory)) { - // throwing error to skip unused index deletion, which gets retried during next snapshot deletion. - throw new Exception("Failed to complete lock release and cleanup for one of the index."); + String snapshotUUID = blob.substring( + SHALLOW_SNAPSHOT_PREFIX.length(), + blob.length() - ".dat".length() + ); + if (!releaseRemoteStoreLockAndCleanup( + shardBlob.getKey(), + snapshotUUID, + shardBlob.getValue(), + remoteStoreLockManagerFactory + )) { + // release lock or async cleanup trigger did not succeed. + releaseLockOrTriggerCleanupFailed = false; } } } } } } - // Deleting the index folder - deleteResult = indexEntry.getValue().delete(); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + if (!releaseLockOrTriggerCleanupFailed) { + // Deleting the index folder + deleteResult = indexEntry.getValue().delete(); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + } else { + logger.warn( + "[{}] index {} is no longer part of any snapshots in the repository, " + + "but skipping clean up of their index folders since either release lock or remote store " + + "cleanup failed for at least one of the index shard.", + metadata.name(), + indexSnId + ); + } + } catch (IOException e) { logger.warn( () -> new ParameterizedMessage(