Skip to content

Commit

Permalink
Address PR comments and fix flaky tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 25, 2024
1 parent 8d3e255 commit af7fcbf
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() thro

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(numDocs + 1, metadataFiles.size());
assertTrue(metadataFiles.size() >= numDocs + 1);

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
}, 30, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
private long previousMinRemoteGenReferenced = -1;
private final AtomicBoolean triggerTrimOnMinRemoteGenReferencedChange = new AtomicBoolean(false);

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -106,6 +107,11 @@ protected void onDelete() {
}
}

@Override
protected void onMinRemoteGenReferencedChange() {
triggerTrimOnMinRemoteGenReferencedChange.set(true);
}

@Override
public void trimUnreferencedReaders() throws IOException {
trimUnreferencedReaders(false, true);
Expand Down Expand Up @@ -148,10 +154,10 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
if (indexDeleted == false && previousMinRemoteGenReferenced == minRemoteGenReferenced) {
if (indexDeleted == false && triggerTrimOnMinRemoteGenReferencedChange.get() == false) {
return;
} else if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
previousMinRemoteGenReferenced = minRemoteGenReferenced;
} else if (triggerTrimOnMinRemoteGenReferencedChange.get()) {
triggerTrimOnMinRemoteGenReferencedChange.set(false);
}

// Since remote generation deletion is async, this ensures that only one generation deletion happens at a time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,11 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
maxRemoteTranslogGenerationUploaded = generation;
long previousMinRemoteGenReferenced = minRemoteGenReferenced;
minRemoteGenReferenced = getMinFileGeneration();
if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
onMinRemoteGenReferencedChange();
}
logger.debug(
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}",
primaryTerm,
Expand All @@ -703,6 +707,10 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}

protected void onMinRemoteGenReferencedChange() {

}

@Override
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
Expand Down

0 comments on commit af7fcbf

Please sign in to comment.