Skip to content

Commit

Permalink
Add lightweight cache at snapshot repo layer to track remote store lo…
Browse files Browse the repository at this point in the history
…ck release and cleanup
  • Loading branch information
Harish Bhakuni committed Feb 16, 2024
1 parent 99726a7 commit 97c6ead
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.lockmanager.FileLockInfo;
Expand Down Expand Up @@ -849,13 +850,15 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
}

/*
Tries to delete shard level directory if it do not have any locks.
Return true if it deleted it successfully
Tries to delete shard level directory if it does not have any locks.
Return true if shard is enqueued successfully for async cleanup.
*/
public boolean deleteIfEmpty() {
Set<String> allLockedFiles;
try {
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX);
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(
MetadataFilenameUtils.METADATA_PREFIX
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return false;
Expand All @@ -865,26 +868,21 @@ public boolean deleteIfEmpty() {
return false;
}

AtomicBoolean cleanupFailed = new AtomicBoolean(false);
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error(
"Exception occurred while deleting directory, it will get retried during next call",
e
);
cleanupFailed.set(true);
logger.error("Exception occurred while deleting directory, it will get retried during next call", e);
}
});
} catch (Exception e) {
logger.error("Exception occurred while deleting directory", e);
cleanupFailed.set(true);
} catch (OpenSearchRejectedExecutionException e) {
logger.error("Exception occurred while enqueueing directory for cleanup", e);
return false;
}
return !cleanupFailed.get();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,14 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException {
@Override
public List<LockInfo> listLocks() throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX);
return lockFiles.stream().map(lock -> FileLockInfo.getLockInfoBuilder().withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock)).withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock)).build()).collect(Collectors.toList());
return lockFiles.stream()
.map(
lock -> FileLockInfo.getLockInfoBuilder()
.withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock))
.withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock))
.build()
)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.lockmanager.*;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -151,6 +153,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -216,6 +219,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private static final String UPLOADED_DATA_BLOB_PREFIX = "__";

/**
* Cache for remote store lock and cleanup which tracks shard level snapshot lock release and remote store cleanup.
*/
protected volatile RemoteStoreShardCleanupTracker remoteStoreShardCleanupTracker;

/**
* Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are
* already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and
Expand Down Expand Up @@ -421,6 +429,7 @@ protected BlobStoreRepository(
this.threadPool = clusterService.getClusterApplierService().threadPool();
this.clusterService = clusterService;
this.recoverySettings = recoverySettings;
this.remoteStoreShardCleanupTracker = new RemoteStoreShardCleanupTracker();
}

@Override
Expand Down Expand Up @@ -1097,44 +1106,22 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
}
}

/**
* The result of removing a snapshot from a shard folder in the repository.
*/
private static final class ShardRemoteStoreReleaseCleanupResult {
// snapshot shard blob file
private final String fileToDelete;

// does release/cleanup successful
private final Boolean cleanupSucceeded;

ShardRemoteStoreReleaseCleanupResult(String fileToDelete, boolean cleanupSucceeded) {
this.fileToDelete = fileToDelete;
this.cleanupSucceeded = cleanupSucceeded;
}
}

private boolean releaseRemoteStoreLockAndCleanup(String fileToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory) {
if (remoteStoreLockManagerFactory == null || !fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
private boolean releaseRemoteStoreLockAndCleanup(
String shardId,
String shallowSnapshotUUID,
BlobContainer shardContainer,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory
) {
if (remoteStoreLockManagerFactory == null) {
return true;
}
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = shallowSnapBlob.substring(
SHALLOW_SNAPSHOT_PREFIX.length(),
shallowSnapBlob.length() - ".dat".length()
);

try {
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
snapshotUUID,
namedXContentRegistry
);
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot = REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
shallowSnapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
Expand All @@ -1145,36 +1132,86 @@ private boolean releaseRemoteStoreLockAndCleanup(String fileToDelete,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!remoteStoreShardCleanupTracker.isShardLockReleased(indexUUID, shardId, shallowSnapshotUUID)) {
remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build());
remoteStoreShardCleanupTracker.addReleasedShardLock(indexUUID, shardId, shallowSnapshotUUID);
}
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
if (!isIndexPresent(clusterService, indexUUID) && remoteStoreMetadataLockManager.listLocks().size() == 0) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
try (RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
)) {
// TODO: if cleanup fails we we will still proceed with corresponding shard md cleanup.
// but this is a known scenario which is needed to be taken care separately.
if (!isIndexPresent(clusterService, indexUUID)
&& !remoteStoreShardCleanupTracker.isShardEnqueued(indexUUID, shardId)
&& remoteStoreMetadataLockManager.listLocks().size() == 0) {
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
// index is already deleted. We will add a poller in future to take care of remote store side cleanup.
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
try (
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
)
) {
// Note: shard cleanup will still happen asynchronously using REMOTE_PURGE threadpool. if it fails,
// it could leave some stale files in remote directory. this issue could even happen in cases of
// shard level remote store data cleanup which also happens asynchronously. in long term, we have
// plans to implement remote store GC poller mechanism which will take care of such stale data.
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
return remoteSegmentStoreDirectory.deleteIfEmpty();
if (remoteSegmentStoreDirectory.deleteIfEmpty()) {
remoteStoreShardCleanupTracker.addEnqueuedShardCleanup(indexUUID, shardId);
return true;
}
return false;
}

}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to release lock or cleanup remote directory, skipping blob deletion for [{}]", shardId), e);
logger.warn(
() -> new ParameterizedMessage(
"Failed to release lock or trigger async cleanup for remote directory, skipping blob deletion for [{}]",
shardId
),
e
);
}
return false;
}

private static class RemoteStoreShardCleanupTracker {
private final Set<String> enqueuedRemoteStoreCleanup;
private final Set<String> releasedLocks;

private RemoteStoreShardCleanupTracker() {
enqueuedRemoteStoreCleanup = new HashSet<>();
releasedLocks = new HashSet<>();
}

private String indexShardIdentifier(String indexUUID, String shardId) {
return String.join("/", indexUUID, shardId);
}

private String shardLockIdentifier(String indexUUID, String shardId, String snapshotUUID) {
return String.join("/", indexUUID, shardId, snapshotUUID);
}

public boolean isShardLockReleased(String indexUUID, String shardId, String snapshotUUID) {
return releasedLocks.contains(shardLockIdentifier(indexUUID, shardId, snapshotUUID));
}

public void addReleasedShardLock(String indexUUID, String shardId, String snapshotUUID) {
releasedLocks.add(shardLockIdentifier(indexUUID, shardId, snapshotUUID));
}

public boolean isShardEnqueued(String indexUUID, String shardId) {
return enqueuedRemoteStoreCleanup.contains(indexShardIdentifier(indexUUID, shardId));
}

public void addEnqueuedShardCleanup(String indexUUID, String shardId) {
enqueuedRemoteStoreCleanup.add(indexShardIdentifier(indexUUID, shardId));
}
}

// When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective
// shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs.
private void executeStaleShardDelete(
Expand All @@ -1188,7 +1225,23 @@ private void executeStaleShardDelete(
try {
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
List<String> modifiedFilesToDelete = filesToDelete.stream().filter(fileToDelete -> releaseRemoteStoreLockAndCleanup(fileToDelete, remoteStoreLockManagerFactory)).collect(Collectors.toList());
List<String> modifiedFilesToDelete = new ArrayList<>();
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = shallowSnapBlob.substring(
SHALLOW_SNAPSHOT_PREFIX.length(),
shallowSnapBlob.length() - ".dat".length()
);
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
if (releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory)) {
modifiedFilesToDelete.add(fileToDelete);
}
}
}
// Deleting the shard blobs
deleteFromContainer(blobContainer(), modifiedFilesToDelete);
l.onResponse(null);
Expand Down Expand Up @@ -1617,27 +1670,47 @@ private void executeOneStaleIndexDelete(
DeleteResult deleteResult = DeleteResult.ZERO;
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
boolean releaseLockOrTriggerCleanupFailed = false;
if (remoteStoreLockManagerFactory != null) {
Map<String, BlobContainer> shardBlobs = indexEntry.getValue().children();
if (!shardBlobs.isEmpty()) {
for (Map.Entry<String, BlobContainer> shardBlob : shardBlobs.entrySet()) {
Map<String, BlobMetadata> shardLevelBlobs = shardBlob.getValue().listBlobs();
for (Map.Entry<String, BlobMetadata> shardLevelBlob : shardLevelBlobs.entrySet()) {
String blob = shardLevelBlob.getKey();
String snapshotUUID = blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length());
if (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) && blob.endsWith(".dat")) {
if (!releaseRemoteStoreLockAndCleanup(blob, remoteStoreLockManagerFactory)) {
// throwing error to skip unused index deletion, which gets retried during next snapshot deletion.
throw new Exception("Failed to complete lock release and cleanup for one of the index.");
String snapshotUUID = blob.substring(
SHALLOW_SNAPSHOT_PREFIX.length(),
blob.length() - ".dat".length()
);
if (!releaseRemoteStoreLockAndCleanup(
shardBlob.getKey(),
snapshotUUID,
shardBlob.getValue(),
remoteStoreLockManagerFactory
)) {
// release lock or async cleanup trigger did not succeed.
releaseLockOrTriggerCleanupFailed = false;
}
}
}
}
}
}
// Deleting the index folder
deleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
if (!releaseLockOrTriggerCleanupFailed) {
// Deleting the index folder
deleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} else {
logger.warn(
"[{}] index {} is no longer part of any snapshots in the repository, "
+ "but skipping clean up of their index folders since either release lock or remote store "
+ "cleanup failed for at least one of the index shard.",
metadata.name(),
indexSnId
);
}

} catch (IOException e) {
logger.warn(
() -> new ParameterizedMessage(
Expand Down

0 comments on commit 97c6ead

Please sign in to comment.