From e6f55833dd40d5db0168de1652de4cdf222ea043 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 4 Dec 2023 12:40:08 +0530 Subject: [PATCH] Add useCache flag to be used by caller Signed-off-by: Sachin Kale --- .../shard/RemoteStoreRefreshListener.java | 2 +- .../store/RemoteSegmentStoreDirectory.java | 33 +++++++------ .../RemoteSegmentStoreDirectoryTests.java | 47 ++++++++++++------- 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index d96a7e7c95ecf..21a71f388e8a9 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -203,7 +203,7 @@ private boolean syncSegments() { // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh. if (isRefreshAfterCommit()) { - remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles()); + remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles(), true); } try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index d1ee13695b905..2d5857857f926 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -95,7 +95,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - private final Cache lockCache; + private final Cache acquiredLockCache; /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. @@ -136,7 +136,8 @@ public RemoteSegmentStoreDirectory( this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; - this.lockCache = CacheBuilder.builder().setExpireAfterWrite(TimeValue.timeValueHours(1)).build(); + // ToDo: make the cache TTL configurable + this.acquiredLockCache = CacheBuilder.builder().setExpireAfterWrite(TimeValue.timeValueHours(1)).build(); this.logger = Loggers.getLogger(getClass(), shardId); init(); } @@ -519,12 +520,12 @@ public void releaseLock(long primaryTerm, long generation, String acquirerId) th @Override public Boolean isLockAcquired(long primaryTerm, long generation) throws IOException { String metadataFile = getMetadataFileForCommit(primaryTerm, generation); - return isLockAcquired(metadataFile); + return isLockAcquired(metadataFile, false); } // Visible for testing - Boolean isLockAcquired(String metadataFile) throws IOException { - if (lockCache.get(metadataFile) != null) { + Boolean isLockAcquired(String metadataFile, boolean useCache) throws IOException { + if (useCache && acquiredLockCache.get(metadataFile) != null) { return true; } boolean lockAcquired = mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build()); @@ -532,7 +533,7 @@ Boolean isLockAcquired(String metadataFile) throws IOException { // We do not want to cache info of unlocked metadata files as we do not want to miss a lock getting acquired // on a metadata file between 2 invocations of this method. Caching unlocked files can lead to data // consistency issues. - lockCache.put(metadataFile, true); + acquiredLockCache.put(metadataFile, true); } return lockAcquired; } @@ -741,7 +742,7 @@ public Map getSegmentsUploadedToRemoteStore() { * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + private void deleteStaleSegments(int lastNMetadataFilesToKeep, boolean useCache) throws IOException { if (lastNMetadataFilesToKeep == -1) { logger.info( "Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1" @@ -767,7 +768,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException ); List metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream().filter(metadataFile -> { try { - return !isLockAcquired(metadataFile); + return !isLockAcquired(metadataFile, useCache); } catch (IOException e) { logger.error( "skipping metadata file (" @@ -830,22 +831,22 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } - public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { - deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {})); + public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, boolean useCache) { + deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}), useCache); } /** * Delete stale segment and metadata files asynchronously. - * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner. + * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int, boolean)} in an async manner. * * @param lastNMetadataFilesToKeep number of metadata files to keep */ - public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener listener) { + private void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener listener, boolean useCache) { if (canDeleteStaleCommits.compareAndSet(true, false)) { try { threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { try { - deleteStaleSegments(lastNMetadataFilesToKeep); + deleteStaleSegments(lastNMetadataFilesToKeep, useCache); listener.onResponse(null); } catch (Exception e) { logger.error( @@ -893,6 +894,10 @@ private boolean deleteIfEmpty() throws IOException { @Override public void close() throws IOException { - deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory"))); + deleteStaleSegmentsAsync( + 0, + ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")), + false + ); } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 58f7b4c1d31ea..c910419d83c79 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -507,30 +507,41 @@ public boolean matches(FileLockInfo right) { public void testIsLockAcquiredCachingCacheHit() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); - long testPrimaryTerm = 1; - long testGeneration = 5; List metadataFiles = List.of("metadata__1__5__abc"); FileLockInfo fileLockInfoLock = FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFiles.get(0)).build(); - when( - remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilePrefixForCommit(testPrimaryTerm, testGeneration), - 1 - ) - ).thenReturn(metadataFiles); // Lock manager isAcquired returns true for the first time and false for subsequent calls when(mdLockManager.isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock)))).thenReturn(true).thenReturn(false); // Due to cached entries which return with isAcquired = true, subsequent calls return from cache - assertTrue(remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration)); - assertTrue(remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration)); - assertTrue(remoteSegmentStoreDirectory.isLockAcquired(testPrimaryTerm, testGeneration)); + assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", true)); + assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", true)); + assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", true)); // As all but first calls are fetching data from cache, we make only one call to mdLockManager verify(mdLockManager, times(1)).isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock))); } + public void testIsLockAcquiredCachingCacheHitUseCacheFalse() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + List metadataFiles = List.of("metadata__1__5__abc"); + FileLockInfo fileLockInfoLock = FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFiles.get(0)).build(); + + // Lock manager isAcquired returns true for the first time and false for subsequent calls + when(mdLockManager.isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock)))).thenReturn(true).thenReturn(false); + + // Due to cached entries which return with isAcquired = true, subsequent calls return from cache + assertTrue(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", false)); + assertFalse(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", false)); + assertFalse(remoteSegmentStoreDirectory.isLockAcquired("metadata__1__5__abc", false)); + + // As all but first calls are fetching data from cache, we make only one call to mdLockManager + verify(mdLockManager, times(3)).isAcquired(argThat(new FileLockInfoMatcher(fileLockInfoLock))); + } + public void testIsLockAcquiredCachingCacheMiss() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -990,7 +1001,7 @@ public void testDeleteStaleCommitsException() throws Exception { // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not // invoked - remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true); assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); @@ -1003,7 +1014,7 @@ public void testDeleteStaleCommitsExceptionWhileScheduling() throws Exception { // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not // invoked - remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true); assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); @@ -1016,7 +1027,7 @@ public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws Excepti // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not // invoked - remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true); assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(false))); verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); @@ -1027,7 +1038,7 @@ public void testDeleteStaleCommitsWithinThreshold() throws Exception { // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted - remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5, true); assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); @@ -1039,7 +1050,7 @@ public void testDeleteStaleCommitsActualDelete() throws Exception { // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true); for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; @@ -1063,7 +1074,7 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws Exception { doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true); for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; @@ -1086,7 +1097,7 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Excep doThrow(new NoSuchFileException(segmentFileWithException)).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2, true); for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];