Skip to content

Commit

Permalink
Fix filtering based on age
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 21, 2024
1 parent 01f3b2d commit 92780e2
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -674,13 +673,13 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

// 4. we need to keep files since last successful run of scheduler
long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState.v1();
long minimumAgeInMillis = lastSuccessfulFetchOfPinnedTimestamps + RemoteStoreSettings
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - RemoteStoreSettings
.getPinnedTimestampsLookbackInterval()
.getMillis();
List<String> metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
metadataFiles,
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TimeValue.timeValueMillis(minimumAgeInMillis)
maximumAllowedTimestamp
);

// 5. Filter out metadata files matching pinned timestamps
Expand Down Expand Up @@ -722,13 +721,9 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
TreeSet<Tuple<Long, Long>> pinnedGenerations = getOrderedPinnedMetadataGenerations();
for (long generation = maxGenerationToBeDeleted; generation >= minGenerationToBeDeleted; generation--) {
// 8. Check if the generation is not referred by metadata file matching pinned timestamps
Tuple<Long, Long> ceilingGenerationRange = pinnedGenerations.ceiling(new Tuple<>(generation, generation));
if (ceilingGenerationRange != null
&& generation >= ceilingGenerationRange.v1()
&& generation <= ceilingGenerationRange.v2()) {
continue;
if (isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToDelete.add(generation);
}
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
// 9. Delete stale generations
Expand Down Expand Up @@ -773,6 +768,18 @@ public void onFailure(Exception e) {
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

private boolean isGenerationPinned(long generation, TreeSet<Tuple<Long, Long>> pinnedGenerations) {
Tuple<Long, Long> ceilingGenerationRange = pinnedGenerations.ceiling(new Tuple<>(generation, generation));
if (ceilingGenerationRange != null && generation >= ceilingGenerationRange.v1() && generation <= ceilingGenerationRange.v2()) {
return true;
}
Tuple<Long, Long> floorGenerationRange = pinnedGenerations.floor(new Tuple<>(generation, generation));
if (floorGenerationRange != null && generation >= floorGenerationRange.v1() && generation <= floorGenerationRange.v2()) {
return true;
}
return false;
}

private TreeSet<Tuple<Long, Long>> getOrderedPinnedMetadataGenerations() {
TreeSet<Tuple<Long, Long>> pinnedGenerations = new TreeSet<>((o1, o2) -> {
if (Objects.equals(o1.v1(), o2.v1()) == false) {
Expand Down

0 comments on commit 92780e2

Please sign in to comment.