Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Oct 11, 2024
1 parent 942765e commit 6171811
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,42 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
// Delete stale generations
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
try {
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
} catch (Exception e) {
logger.error("Exception in delete generations flow", e);
// Release permit that is meant for metadata files and return
remoteGenerationDeletionPermits.release();
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
return;
}
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);
try {
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);
} catch (Exception e) {
logger.error("Exception in delete metadata files flow", e);
// Permits is already released by deleteMetadataFilesAsync
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
return;
}

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
Expand All @@ -240,7 +261,12 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
remoteGenerationDeletionPermits.release();
}
} catch (Exception e) {
logger.error("Exception in trimUnreferencedReaders", e);
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
}
}

Expand Down Expand Up @@ -441,7 +467,8 @@ protected static void deleteStaleRemotePrimaryTerms(
}
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> {
try {
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap, logger)
.v1();
} catch (IOException e) {
return Long.MIN_VALUE;
}
Expand Down Expand Up @@ -482,7 +509,8 @@ protected static Long getMinPrimaryTermInRemote(
protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
String metadataFile,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
Logger logger
) throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
Expand All @@ -504,6 +532,8 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
} else {
logger.warn("No primary term found from GenerationToPrimaryTermMap for file [{}]", metadataFile);
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,14 @@ protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
try {
deleteRemoteGeneration(generationsToDelete);
} catch (Exception e) {
logger.error("Exception in delete generations flow", e);
// Release permit that is meant for metadata files and return
remoteGenerationDeletionPermits.release();
return;
}
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
deleteStaleRemotePrimaryTerms();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import org.mockito.Mockito;

import static org.opensearch.index.translog.RemoteFsTranslog.REMOTE_DELETION_PERMITS;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
Expand Down Expand Up @@ -480,10 +481,7 @@ public void onResponse(List<BlobMetadata> blobMetadataList) {
// we will not delete them
if (dataFilesAfterTrim.equals(dataFilesBeforeTrim) == false) {
// We check for number of pinned timestamp or +1 due to latest metadata.
assertTrue(
metadataFilesAfterTrim.size() == pinnedTimestamps.size()
|| metadataFilesAfterTrim.size() == pinnedTimestamps.size() + 1
);
assertTrue(metadataFilesAfterTrim.size() >= pinnedTimestamps.size());
}

for (String md : pinnedTimestampMatchingMetadataFiles) {
Expand Down Expand Up @@ -1061,31 +1059,32 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException
public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException {
TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class);

RemoteFsTimestampAwareTranslog translog = (RemoteFsTimestampAwareTranslog) this.translog;

// Fetch generations directly from the filename
assertEquals(
new Tuple<>(1L, 1008L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854774799__9223372036854774799__9223370311919910393__31__9223372036854775106__1__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);
assertEquals(
new Tuple<>(4L, 7L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775800__9223372036854775800__9223370311919910398__31__9223372036854775803__4__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);
assertEquals(
new Tuple<>(10L, 10L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775797__9223372036854775800__9223370311919910398__31__9223372036854775803__10__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);

Expand All @@ -1099,22 +1098,54 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException {
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);
assertEquals(
new Tuple<>(4L, 7L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1",
translogTransferManager,
Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L))
Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L)),
logger
)
);

verify(translogTransferManager).readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1");
verify(translogTransferManager, times(0)).readMetadata(
"metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1"
);

// Older md files with empty GenerationToPrimaryTermMap
md1 = mock(TranslogTransferMetadata.class);
when(md1.getGenerationToPrimaryTermMapper()).thenReturn(Map.of());
when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1"))
.thenReturn(md1);
assertEquals(
new Tuple<>(-1L, 2L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1",
translogTransferManager,
new HashMap<>(),
logger
)
);

// Older md files with empty GenerationToPrimaryTermMap
md1 = mock(TranslogTransferMetadata.class);
when(md1.getGenerationToPrimaryTermMapper()).thenReturn(null);
when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1"))
.thenReturn(md1);
assertEquals(
new Tuple<>(-1L, 2L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1",
translogTransferManager,
new HashMap<>(),
logger
)
);
}

public void testDeleteStaleRemotePrimaryTerms() throws IOException {
Expand Down Expand Up @@ -1332,4 +1363,146 @@ public void testGetMinPrimaryTermInRemoteNotFetched() throws IOException {
);
verify(translogTransferManager).listPrimaryTermsInRemote();
}

public void testTrimUnreferencedReadersStalePinnedTimestamps() throws Exception {
ArrayList<Translog.Operation> ops = new ArrayList<>();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));

// First reader is created at the init of translog
assertEquals(3, translog.readers.size());
assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> {
assertEquals(
10,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

translog.setMinSeqNoToKeep(3);
translog.trimUnreferencedReaders();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.setMinSeqNoToKeep(6);
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

assertEquals(1, translog.readers.size());
assertBusy(() -> {
assertEquals(2, translog.allUploaded().size());
assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
16,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

assertEquals(3, translog.readers.size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
20,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);
}

public void testTrimUnreferencedReadersNoPermits() throws Exception {
// Acquire the permits so that remote translog deletion will not happen
translog.remoteGenerationDeletionPermits.acquire(REMOTE_DELETION_PERMITS);

ArrayList<Translog.Operation> ops = new ArrayList<>();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));

// First reader is created at the init of translog
assertEquals(3, translog.readers.size());
assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> {
assertEquals(
10,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
// Fetch pinned timestamps so that it won't be stale
updatePinnedTimstampTask.run();
translog.setMinSeqNoToKeep(3);
translog.trimUnreferencedReaders();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 }));

// Fetch pinned timestamps so that it won't be stale
updatePinnedTimstampTask.run();
translog.setMinSeqNoToKeep(6);
translog.trimUnreferencedReaders();

assertEquals(1, translog.readers.size());
assertBusy(() -> {
assertEquals(2, translog.allUploaded().size());
assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
16,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 }));

// Fetch pinned timestamps so that it won't be stale
updatePinnedTimstampTask.run();
translog.trimUnreferencedReaders();

assertEquals(3, translog.readers.size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
20,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);
}
}

0 comments on commit 6171811

Please sign in to comment.