Skip to content

Commit

Permalink
Translog GC changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 19, 2024
1 parent a3424a6 commit b6b7c16
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 55 deletions.
232 changes: 206 additions & 26 deletions server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
Expand All @@ -39,22 +44,28 @@
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;

/**
* A Translog implementation which syncs local FS with a remote store
Expand Down Expand Up @@ -92,6 +103,8 @@ public class RemoteFsTranslog extends Translog {
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private final AtomicBoolean pauseSync = new AtomicBoolean(false);
private final boolean isTranslogMetadataEnabled;
private final Map<Long, String> metadataFilePinnedTimestampMap;
private final Map<String, Tuple<Long, Long>> metadataFileGenerationMap;

public RemoteFsTranslog(
TranslogConfig config,
Expand All @@ -105,13 +118,45 @@ public RemoteFsTranslog(
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
this(
config,
translogUUID,
deletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings,
-1
);
}

public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
TranslogDeletionPolicy deletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings,
long timestamp
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
isTranslogMetadataEnabled = indexSettings().isTranslogMetadataEnabled();
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.metadataFileGenerationMap = new HashMap<>();
this.translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
Expand All @@ -123,7 +168,7 @@ public RemoteFsTranslog(
isTranslogMetadataEnabled
);
try {
download(translogTransferManager, location, logger, config.shouldSeedRemote());
download(translogTransferManager, location, logger, config.shouldSeedRemote(), timestamp);
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
Expand Down Expand Up @@ -174,6 +219,32 @@ public static void download(
Logger logger,
boolean seedRemote,
boolean isTranslogMetadataEnabled
) throws IOException {
download(
repository,
shardId,
threadPool,
location,
pathStrategy,
remoteStoreSettings,
logger,
seedRemote,
isTranslogMetadataEnabled,
-1
);
}

public static void download(
Repository repository,
ShardId shardId,
ThreadPool threadPool,
Path location,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger,
boolean seedRemote,
boolean isTranslogMetadataEnabled,
long timestamp
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
Expand All @@ -195,11 +266,12 @@ public static void download(
remoteStoreSettings,
isTranslogMetadataEnabled
);
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote);
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote, timestamp);
logger.trace(remoteTranslogTransferTracker.toString());
}

static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
// Visible for testing
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote, long timestamp)
throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
Expand All @@ -213,7 +285,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
boolean success = false;
long startTimeMs = System.currentTimeMillis();
try {
downloadOnce(translogTransferManager, location, logger, seedRemote);
downloadOnce(translogTransferManager, location, logger, seedRemote, timestamp);
success = true;
return;
} catch (FileNotFoundException | NoSuchFileException e) {
Expand All @@ -227,13 +299,18 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
throw ex;
}

private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
private static void downloadOnce(
TranslogTransferManager translogTransferManager,
Path location,
Logger logger,
boolean seedRemote,
long timestamp
) throws IOException {
logger.debug("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
long prevDownloadTimeInMillis = statsTracker.getTotalDownloadTimeInMillis();
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(timestamp);
if (translogMetadata != null) {
if (Files.notExists(location)) {
Files.createDirectories(location);
Expand Down Expand Up @@ -551,29 +628,129 @@ public void trimUnreferencedReaders() throws IOException {
return;
}

// cleans up remote translog files not referenced in latest uploaded metadata.
// This enables us to restore translog from the metadata in case of failover or relocation.
Set<Long> generationsToDelete = new HashSet<>();
for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) {
if (fileTransferTracker.uploaded(Translog.getFilename(generation)) == false) {
break;
// ToDo: Check last fetch status of pinned timestamps. If stale, return.

ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

// ToDo: 1. Check last fetch status of pinned timestamps. If stale, return.

// 2. Filter out last X mins of files
// ToDo: X would be last successful fetch of pinned timestamps
List<String> metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
metadataFiles,
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TimeValue.timeValueMinutes(5)
);

// 3. Get md files matching pinned timestamps
Set<Long> pinnedTimestamps = new HashSet<>();
Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFilesToBeDeleted,
pinnedTimestamps,
metadataFilePinnedTimestampMap,
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);

// 4. For new pinned timestamp, read matching file to get min and max generations
try {
readAndCacheGenerationForPinnedTimestamp(implicitLockedFiles);
} catch (IOException e) {
throw new RuntimeException(e);
}

// 5. Filter out metadata files matching pinned timestamps
metadataFilesToBeDeleted.removeAll(implicitLockedFiles);

// 6. From the remaining files, read oldest and latest
String oldestMetadataFileToBeDeleted = metadataFilesToBeDeleted.get(metadataFilesToBeDeleted.size() - 1);
String latestMetadataFileToBeDeleted = metadataFilesToBeDeleted.get(0);

// 7. Delete generations
long minGenerationToBeDeleted;
long maxGenerationToBeDeleted;
try {
if (metadataFileGenerationMap.containsKey(latestMetadataFileToBeDeleted) == false) {
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(latestMetadataFileToBeDeleted);
maxGenerationToBeDeleted = metadata.getMinTranslogGeneration() - 1;
} else {
maxGenerationToBeDeleted = metadataFileGenerationMap.get(latestMetadataFileToBeDeleted).v1() - 1;
}
long minGenerationToKeep = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
maxGenerationToBeDeleted = Math.min(maxGenerationToBeDeleted, minGenerationToKeep);

if (metadataFileGenerationMap.containsKey(oldestMetadataFileToBeDeleted) == false) {
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(oldestMetadataFileToBeDeleted);
minGenerationToBeDeleted = metadata.getMinTranslogGeneration() - 1;
} else {
minGenerationToBeDeleted = metadataFileGenerationMap.get(oldestMetadataFileToBeDeleted).v1() - 1;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Set<Long> generationsToDelete = new HashSet<>();
TreeSet<Tuple<Long, Long>> pinnedGenerations = getOrderedPinnedMetadataGenerations();
for (long generation = maxGenerationToBeDeleted; generation >= minGenerationToBeDeleted; generation--) {
// 8. Check if the generation is not referred by metadata file matching pinned timestamps
Tuple<Long, Long> ceilingGenerationRange = pinnedGenerations.ceiling(new Tuple<>(generation, generation));
if (ceilingGenerationRange != null
&& generation >= ceilingGenerationRange.v1()
&& generation <= ceilingGenerationRange.v2()) {
continue;
}
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
// 9. Delete stale generations
deleteRemoteGenerations(generationsToDelete);
// 10. Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, remoteGenerationDeletionPermits::release);
// 11. Delete primary terms
deleteStaleRemotePrimaryTerms(metadataFiles);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
}
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
deleteStaleRemotePrimaryTerms();
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);

@Override
public void onFailure(Exception e) {

}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

private TreeSet<Tuple<Long, Long>> getOrderedPinnedMetadataGenerations() {
TreeSet<Tuple<Long, Long>> pinnedGenerations = new TreeSet<>((o1, o2) -> {
if (Objects.equals(o1.v1(), o2.v1()) == false) {
return o1.v1().compareTo(o2.v1());
} else {
return o1.v2().compareTo(o2.v2());
}
});
pinnedGenerations.addAll(metadataFileGenerationMap.values());
return pinnedGenerations;
}

private void readAndCacheGenerationForPinnedTimestamp(Set<String> implicitLockedFiles) throws IOException {
Set<String> nonCachedMetadataFiles = implicitLockedFiles.stream()
.filter(f -> metadataFileGenerationMap.containsKey(f) == false)
.collect(Collectors.toSet());
metadataFileGenerationMap.keySet().retainAll(implicitLockedFiles);
for (String metadataFile : nonCachedMetadataFiles) {
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile);
metadataFileGenerationMap.put(metadataFile, new Tuple<>(metadata.getMinTranslogGeneration(), metadata.getGeneration()));
}
}

/**
* Deletes remote translog and metadata files asynchronously corresponding to the generations.
* @param generations generations to be deleted.
*/
private void deleteRemoteGeneration(Set<Long> generations) {
private void deleteRemoteGenerations(Set<Long> generations) {
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generations,
Expand All @@ -587,17 +764,20 @@ private void deleteRemoteGeneration(Set<Long> generations) {
* <br>
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
private void deleteStaleRemotePrimaryTerms() {
private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
if (readers.isEmpty()) {
logger.trace("Translog reader list is empty, returning from deleteStaleRemotePrimaryTerms");
if (metadataFiles.isEmpty()) {
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
return;
}
Optional<Long> minPrimaryTerm = metadataFiles.stream()
.map(file -> Long.valueOf(file.split(METADATA_SEPARATOR)[1]))
.min(Long::compareTo);
// First we delete all stale primary terms folders from remote store
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1;
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
}
}
Expand Down
Loading

0 comments on commit b6b7c16

Please sign in to comment.