Skip to content

Commit

Permalink
Add UTs for remote store directory changes
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Feb 20, 2024
1 parent 053fabb commit 1bf807d
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
*/
protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true);

protected final AtomicBoolean isDirectoryCleanupOngoing = new AtomicBoolean(false);

private final AtomicLong metadataUploadCounter = new AtomicLong(0);

public static final int METADATA_FILES_TO_FETCH = 10;
Expand Down Expand Up @@ -854,33 +856,39 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
Return true if shard is enqueued successfully for async cleanup.
*/
public boolean deleteIfEmpty() {
Set<String> allLockedFiles;
try {
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(
MetadataFilenameUtils.METADATA_PREFIX
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return false;
}
if (allLockedFiles.size() != 0) {
logger.info("Remote directory still has locked files, not deleting the path");
return false;
}

try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory, it will get retried during next call", e);
}
});
} catch (OpenSearchRejectedExecutionException e) {
logger.error("Exception occurred while enqueueing directory for cleanup", e);
return false;
if (isDirectoryCleanupOngoing.compareAndSet(false, true)) {
Set<String> allLockedFiles;
try {
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(
MetadataFilenameUtils.METADATA_PREFIX
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
isDirectoryCleanupOngoing.set(false);
return false;
}
if (allLockedFiles.size() != 0) {
logger.info("Remote directory still has locked files, not deleting the path");
isDirectoryCleanupOngoing.set(false);
return false;
}
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory, it will get retried during next call", e);
} finally {
isDirectoryCleanupOngoing.set(false);
}
});
} catch (OpenSearchRejectedExecutionException e) {
logger.error("Exception occurred while enqueueing directory for cleanup", e);
isDirectoryCleanupOngoing.set(false);
return false;
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.common.annotation.PublicApi;

import java.io.IOException;
import java.util.List;

/**
* An Interface that defines Remote Store Lock Manager.
Expand Down Expand Up @@ -44,7 +43,12 @@ public interface RemoteStoreLockManager {
*/
Boolean isAcquired(LockInfo lockInfo) throws IOException;

List<LockInfo> listLocks() throws IOException;
/**
*
* @return true if lock directory is empty.
* @throws IOException
*/
Boolean isEmpty() throws IOException;

/**
* Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,9 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException {
}

@Override
public List<LockInfo> listLocks() throws IOException {
public Boolean isEmpty() throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX);
return lockFiles.stream()
.map(
lock -> FileLockInfo.getLockInfoBuilder()
.withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock))
.withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock))
.build()
)
.collect(Collectors.toList());
return lockFiles.isEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -153,7 +155,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -220,7 +221,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final String UPLOADED_DATA_BLOB_PREFIX = "__";

/**
* Cache for remote store lock and cleanup which tracks shard level snapshot lock release and remote store cleanup.
* Local in-memory cache which tracks shard level snapshot lock release and remote store cleanup.
*/
protected volatile RemoteStoreShardCleanupTracker remoteStoreShardCleanupTracker;

Expand Down Expand Up @@ -1106,7 +1107,7 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
}
}

private boolean releaseRemoteStoreLockAndCleanup(
protected boolean releaseRemoteStoreLockAndCleanup(
String shardId,
String shallowSnapshotUUID,
BlobContainer shardContainer,
Expand Down Expand Up @@ -1139,7 +1140,7 @@ private boolean releaseRemoteStoreLockAndCleanup(
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
if (!isIndexPresent(clusterService, indexUUID)
&& !remoteStoreShardCleanupTracker.isShardEnqueued(indexUUID, shardId)
&& remoteStoreMetadataLockManager.listLocks().size() == 0) {
&& remoteStoreMetadataLockManager.isEmpty()) {
// Note: this is a temporary solution where snapshot deletion triggers remote store side cleanup if
// index is already deleted. We will add a poller in future to take care of remote store side cleanup.
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
Expand Down Expand Up @@ -1179,12 +1180,18 @@ private boolean releaseRemoteStoreLockAndCleanup(
}

private static class RemoteStoreShardCleanupTracker {
private final Set<String> enqueuedRemoteStoreCleanup;
private final Set<String> releasedLocks;
private final Cache<String, Boolean> enqueuedRemoteStoreCleanup;
private final Cache<String, Boolean> releasedLocks;
// lock files will be per shard per snapshot
private static final int DEFAULT_RELEASED_LOCK_CACHE_SIZE = 10000;
// cleanup will be per shard
private static final int DEFAULT_ENQUEUED_CLEANUP_CACHE_SIZE = 1000;

private RemoteStoreShardCleanupTracker() {
enqueuedRemoteStoreCleanup = new HashSet<>();
releasedLocks = new HashSet<>();
enqueuedRemoteStoreCleanup = CacheBuilder.<String, Boolean>builder()
.setMaximumWeight(DEFAULT_ENQUEUED_CLEANUP_CACHE_SIZE)
.build();
releasedLocks = CacheBuilder.<String, Boolean>builder().setMaximumWeight(DEFAULT_RELEASED_LOCK_CACHE_SIZE).build();
}

private String indexShardIdentifier(String indexUUID, String shardId) {
Expand All @@ -1196,19 +1203,19 @@ private String shardLockIdentifier(String indexUUID, String shardId, String snap
}

public boolean isShardLockReleased(String indexUUID, String shardId, String snapshotUUID) {
return releasedLocks.contains(shardLockIdentifier(indexUUID, shardId, snapshotUUID));
return Boolean.TRUE == releasedLocks.get(shardLockIdentifier(indexUUID, shardId, snapshotUUID));
}

public void addReleasedShardLock(String indexUUID, String shardId, String snapshotUUID) {
releasedLocks.add(shardLockIdentifier(indexUUID, shardId, snapshotUUID));
releasedLocks.put(shardLockIdentifier(indexUUID, shardId, snapshotUUID), true);
}

public boolean isShardEnqueued(String indexUUID, String shardId) {
return enqueuedRemoteStoreCleanup.contains(indexShardIdentifier(indexUUID, shardId));
return Boolean.TRUE == enqueuedRemoteStoreCleanup.get(indexShardIdentifier(indexUUID, shardId));
}

public void addEnqueuedShardCleanup(String indexUUID, String shardId) {
enqueuedRemoteStoreCleanup.add(indexShardIdentifier(indexUUID, shardId));
enqueuedRemoteStoreCleanup.put(indexShardIdentifier(indexUUID, shardId), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -920,6 +921,41 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException {
assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init());
}

public void testDeleteIfEmpty_Rejected() throws Exception {
populateMetadata();
doThrow(new OpenSearchRejectedExecutionException()).when(threadPool).executor(any(String.class));
when(mdLockManager.fetchLockedMetadataFiles(any())).thenReturn(Set.of());

// deleteIfEmpty() should return false if threadpool rejects the new task.
assertFalse(remoteSegmentStoreDirectory.deleteIfEmpty());
}

public void testDeleteIfEmpty_NotEmpty() throws Exception {
populateMetadata();
when(mdLockManager.fetchLockedMetadataFiles(any())).thenReturn(Set.of(metadataFilename2));

// deleteIfEmpty() should return false if some md files are still locked.
assertFalse(remoteSegmentStoreDirectory.deleteIfEmpty());
}

public void testDeleteIfEmpty_FetchLockFailed() throws Exception {
populateMetadata();
when(mdLockManager.fetchLockedMetadataFiles(any())).thenThrow(new RuntimeException("Rate limit exceeded"));

// deleteIfEmpty() should return false if some md files are still locked.
assertFalse(remoteSegmentStoreDirectory.deleteIfEmpty());
}

public void testDeleteIfEmpty() throws Exception {
populateMetadata();
when(mdLockManager.fetchLockedMetadataFiles(any())).thenReturn(Set.of());
assertTrue(remoteSegmentStoreDirectory.deleteIfEmpty());
assertBusy(() -> assertThat(remoteSegmentStoreDirectory.isDirectoryCleanupOngoing.get(), is(false)));
verify(remoteDataDirectory).delete();
verify(remoteMetadataDirectory).delete();
verify(mdLockManager).delete();
}

public void testDeleteStaleCommitsException() throws Exception {
populateMetadata();
when(
Expand Down

0 comments on commit 1bf807d

Please sign in to comment.