Skip to content

Commit

Permalink
Optimize GC flow with pinned timestamps
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 19, 2024
1 parent 3a1b6d1 commit c0eff29
Show file tree
Hide file tree
Showing 11 changed files with 831 additions and 239 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,10 @@ Set<String> getMetadataFilesToFilterActiveSegments(
return metadataFilesToFilterActiveSegments;
}

public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
deleteStaleSegments(lastNMetadataFilesToKeep, Map.of());
}

/**
* Delete stale segment and metadata files
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
Expand All @@ -832,7 +836,7 @@ Set<String> getMetadataFilesToFilterActiveSegments(
* @param lastNMetadataFilesToKeep number of metadata files to keep
* @throws IOException in case of I/O error while reading from / writing to remote segment store
*/
public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
private void deleteStaleSegments(int lastNMetadataFilesToKeep, Map<String, Long> pinnedTimestampsToSkip) throws IOException {
if (lastNMetadataFilesToKeep == -1) {
logger.info(
"Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1"
Expand All @@ -854,12 +858,12 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}

// Check last fetch status of pinned timestamps. If stale, return.
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
if (lastNMetadataFilesToKeep != 0 && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
return;
}

Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip);

Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
sortedMetadataFileList,
Expand Down Expand Up @@ -994,7 +998,9 @@ public static void remoteDirectoryCleanup(
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
boolean forceClean,
Map<String, Long> pinnedTimestampsToSkip
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
Expand All @@ -1003,8 +1009,12 @@ public static void remoteDirectoryCleanup(
shardId,
pathStrategy
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
if (forceClean) {
remoteSegmentStoreDirectory.delete();
} else {
remoteSegmentStoreDirectory.deleteStaleSegments(0, pinnedTimestampsToSkip);
remoteSegmentStoreDirectory.deleteIfEmpty();
}
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
Expand All @@ -1023,7 +1033,10 @@ private boolean deleteIfEmpty() throws IOException {
logger.info("Remote directory still has files, not deleting the path");
return false;
}
return delete();
}

private boolean delete() {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
private long maxDeletedGenerationOnRemote = 0;

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -135,13 +136,20 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
if ((indexDeleted == false && startedPrimarySupplier.getAsBoolean() == false) || pauseSync.get()) {
return;
}

// This is to fail fast and avoid listing md files un-necessarily.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
return;
}

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
long minGenerationToKeep = minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep();
if (indexDeleted == false && (minGenerationToKeep <= maxDeletedGenerationOnRemote)) {
return;
}

Expand All @@ -158,24 +166,20 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.size() <= 1) {
if (indexDeleted == false && metadataFiles.size() <= 1) {
logger.debug("No stale translog metadata files found");
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
}

// Check last fetch status of pinned timestamps. If stale, return.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
}

List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
logger
);
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted);

// If index is not deleted, make sure to keep latest metadata file
if (indexDeleted == false) {
Expand All @@ -194,21 +198,28 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted
indexDeleted ? Long.MAX_VALUE : minRemoteGenReferenced
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
maxDeletedGenerationOnRemote = generationsToBeDeleted.stream().max(Long::compareTo).get();

// Delete stale generations
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
Expand All @@ -217,11 +228,10 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
remoteGenerationDeletionPermits.release();
}
} catch (Exception e) {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
Expand All @@ -241,14 +251,8 @@ public void onFailure(Exception e) {
protected Set<Long> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
boolean indexDeleted
long minRemoteGenReferenced
) throws IOException {
long maxGenerationToBeDeleted = Long.MAX_VALUE;

if (indexDeleted == false) {
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
}

Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
Expand All @@ -262,24 +266,36 @@ protected Set<Long> getGenerationsToBeDeleted(
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
// Check if the generation is not referred by metadata file matching pinned timestamps
if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) {
// The check with minRemoteGenReferenced is redundant but kept as to make sure we don't delete generations
// that are not persisted in remote segment store yet.
if (generation < minRemoteGenReferenced && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
}
}
return generationsToBeDeleted;
}

protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles, boolean indexDeleted) {
return getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
minRemoteGenReferenced,
Map.of(),
indexDeleted,
logger
);
}

// Visible for testing
protected static List<String> getMetadataFilesToBeDeleted(
List<String> metadataFiles,
Map<Long, String> metadataFilePinnedTimestampMap,
long minRemoteGenReferenced,
Map<String, Long> pinnedTimestampsToSkip,
boolean indexDeleted,
Logger logger
) {
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip);

// Keep files since last successful run of scheduler
List<String> metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
Expand Down Expand Up @@ -312,6 +328,22 @@ protected static List<String> getMetadataFilesToBeDeleted(
metadataFilesToBeDeleted.size()
);

if (indexDeleted == false) {
// Filter out metadata files based on minRemoteGenReferenced
List<String> metadataFilesContainingMinRemoteGenReferenced = metadataFilesToBeDeleted.stream().filter(md -> {
long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md);
return maxGeneration == -1 || maxGeneration > minRemoteGenReferenced;
}).collect(Collectors.toList());
metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinRemoteGenReferenced);

logger.trace(
"metadataFilesContainingMinRemoteGenReferenced.size = {}, metadataFilesToBeDeleted based on minRemoteGenReferenced filtering = {}, minRemoteGenReferenced = {}",
metadataFilesContainingMinRemoteGenReferenced.size(),
metadataFilesToBeDeleted.size(),
minRemoteGenReferenced
);
}

return metadataFilesToBeDeleted;
}

Expand Down Expand Up @@ -472,50 +504,65 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
}
}

public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
public static void cleanup(
TranslogTransferManager translogTransferManager,
boolean forceClean,
Map<String, Long> pinnedTimestampsToSkip
) throws IOException {
if (forceClean) {
translogTransferManager.delete();
} else {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
new HashMap<>(),
Long.MAX_VALUE,
pinnedTimestampsToSkip,
true,
staticLogger
);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});
// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -685,10 +685,11 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.debug(
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}",
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}",
primaryTerm,
generation,
maxSeqNo
maxSeqNo,
minRemoteGenReferenced
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
}
}

public static long getMaxGenerationFromFileName(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
try {
return RemoteStoreUtils.invertLong(tokens[2]);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e);
return -1;
}
}

public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
Expand Down
Loading

0 comments on commit c0eff29

Please sign in to comment.