Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Sep 4, 2024
1 parent 275e80f commit c008310
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
* <br>
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
private static void deleteStaleRemotePrimaryTerms(
protected static void deleteStaleRemotePrimaryTerms(
List<String> metadataFiles,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
Expand Down Expand Up @@ -496,16 +496,15 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
// Delete stale primary terms
() -> deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
)
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -53,6 +55,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand All @@ -63,7 +66,9 @@
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -889,4 +894,93 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException
verify(translogTransferManager).readMetadata("metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1");
verify(translogTransferManager).readMetadata("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1");
}

public void testDeleteStaleRemotePrimaryTerms() throws IOException {
TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class);

List<String> metadataFiles = List.of(
// PT 4 to 9
"metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1",
// PT 2 to 7
"metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1",
// PT 2 to 6
"metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1"
);

Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class);
when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of(1L, 2L, 3L, 4L));
AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms(
metadataFiles,
translogTransferManager,
new HashMap<>(),
minPrimaryTermInRemote,
staticLogger
);
verify(translogTransferManager).deletePrimaryTermsAsync(2L);
assertEquals(2, minPrimaryTermInRemote.get());

RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms(
metadataFiles,
translogTransferManager,
new HashMap<>(),
minPrimaryTermInRemote,
staticLogger
);
// This means there are no new invocations of deletePrimaryTermAsync
verify(translogTransferManager, times(1)).deletePrimaryTermsAsync(anyLong());
}

public void testDeleteStaleRemotePrimaryTermsNoPrimaryTermInRemote() throws IOException {
TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class);

List<String> metadataFiles = List.of(
// PT 4 to 9
"metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1",
// PT 2 to 7
"metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1",
// PT 2 to 6
"metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1"
);

Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class);
when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of());
AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms(
metadataFiles,
translogTransferManager,
new HashMap<>(),
minPrimaryTermInRemote,
staticLogger
);
verify(translogTransferManager, times(0)).deletePrimaryTermsAsync(anyLong());
assertEquals(Long.MAX_VALUE, minPrimaryTermInRemote.get());
}

public void testDeleteStaleRemotePrimaryTermsPrimaryTermInRemoteIsBigger() throws IOException {
TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class);

List<String> metadataFiles = List.of(
// PT 4 to 9
"metadata__9223372036854775798__9223372036854774799__9223370311919910393__node1__9223372036438563958__4__1",
// PT 2 to 7
"metadata__9223372036854775800__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1",
// PT 2 to 6
"metadata__9223372036854775801__9223372036854774799__9223370311919910393__node1__9223372036438563958__2__1"
);

Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslogTests.class);
when(translogTransferManager.listPrimaryTermsInRemote()).thenReturn(Set.of(2L, 3L, 4L));
AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
RemoteFsTimestampAwareTranslog.deleteStaleRemotePrimaryTerms(
metadataFiles,
translogTransferManager,
new HashMap<>(),
minPrimaryTermInRemote,
staticLogger
);
verify(translogTransferManager, times(0)).deletePrimaryTermsAsync(anyLong());
assertEquals(2, minPrimaryTermInRemote.get());
}

}

0 comments on commit c008310

Please sign in to comment.