From c20ae4790bb126d77fa49c1ab2d0d6279e2fd314 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Date: Mon, 13 May 2024 16:21:28 +0530 Subject: [PATCH] initial commits Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> --- .../index/translog/RemoteFsTranslog.java | 19 ++- .../index/translog/transfer/FileSnapshot.java | 10 ++ .../translog/transfer/TransferSnapshot.java | 3 + .../TranslogCheckpointTransferSnapshot.java | 29 +++++ .../transfer/TranslogTransferManager.java | 115 ++++++++++++++++-- 5 files changed, 165 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index da905b9605dfd..69600c7f4066c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -91,6 +91,7 @@ public class RemoteFsTranslog extends Translog { private static final int SYNC_PERMIT = 1; private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); private final AtomicBoolean pauseSync = new AtomicBoolean(false); + boolean ckpAsMetadata; public RemoteFsTranslog( TranslogConfig config, @@ -110,6 +111,7 @@ public RemoteFsTranslog( this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); + ckpAsMetadata = true; this.translogTransferManager = buildTranslogTransferManager( blobStoreRepository, threadPool, @@ -117,7 +119,8 @@ public RemoteFsTranslog( fileTransferTracker, remoteTranslogTransferTracker, indexSettings().getRemoteStorePathStrategy(), - remoteStoreSettings + remoteStoreSettings, + ckpAsMetadata ); try { download(translogTransferManager, location, logger); @@ -288,7 +291,8 @@ public static TranslogTransferManager buildTranslogTransferManager( FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker tracker, RemoteStorePathStrategy pathStrategy, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + boolean ckpAsMetadata ) { assert Objects.nonNull(pathStrategy); String indexUUID = shardId.getIndex().getUUID(); @@ -310,7 +314,16 @@ public static TranslogTransferManager buildTranslogTransferManager( .build(); BlobPath mdPath = pathStrategy.generatePath(mdPathInput); BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool); - return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings); + return new TranslogTransferManager( + shardId, + transferService, + dataPath, + mdPath, + fileTransferTracker, + tracker, + remoteStoreSettings, + ckpAsMetadata + ); } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index dcec94edd694f..83368e98aa13e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Arrays; +import java.util.Map; import java.util.Objects; /** @@ -108,6 +109,7 @@ public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; private Long checksum; + private Map metadata; public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { super(path); @@ -128,6 +130,14 @@ public long getPrimaryTerm() { return primaryTerm; } + public void setMetadata(Map metadata) { + this.metadata = metadata; + } + + public Map getMetadata() { + return metadata; + } + @Override public int hashCode() { return Objects.hash(primaryTerm, super.hashCode()); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java index ef34fd31a296b..1503d5c1791cf 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -12,6 +12,7 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import java.io.IOException; import java.util.Set; /** @@ -39,4 +40,6 @@ public interface TransferSnapshot { * @return the translog transfer metadata */ TranslogTransferMetadata getTranslogTransferMetadata(); + + Set getTranslogFileWithMetadataSnapshots() throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index fb78731246a07..78f9a70701ca1 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -13,11 +13,15 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -64,6 +68,31 @@ public Set getTranslogFileSnapshots() { return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); } + @Override + public Set getTranslogFileWithMetadataSnapshots() throws IOException { + Set toUploadList = new HashSet<>(); + for (Tuple tuple : translogCheckpointFileInfoTupleSet) { + TranslogFileSnapshot translogFileSnapshot = tuple.v1(); + CheckpointFileSnapshot checkpointFileSnapshot = tuple.v2(); + translogFileSnapshot.setMetadata(buildMetadata(checkpointFileSnapshot.getPath())); + toUploadList.add(translogFileSnapshot); + } + return toUploadList; + } + + public Map buildMetadata(Path checkpointPath) throws IOException { + Map metadata = new HashMap<>(); + String ckpString = buildCheckpointDataAsBase64String(checkpointPath); + metadata.put("ckp-data", ckpString); + return metadata; + } + + static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException { + long fileSize = Files.size(checkpointFilePath); + byte[] fileBytes = Files.readAllBytes(checkpointFilePath); + return Base64.getEncoder().encodeToString(fileBytes); + } + @Override public TranslogTransferMetadata getTranslogTransferMetadata() { return new TranslogTransferMetadata( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1087244623b87..c5107b6bdc18e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -16,6 +16,7 @@ import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -36,6 +37,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -63,6 +65,7 @@ public class TranslogTransferManager { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private final RemoteStoreSettings remoteStoreSettings; private static final int METADATA_FILES_TO_FETCH = 10; + boolean ckpAsMetadata; private final Logger logger; @@ -79,7 +82,8 @@ public TranslogTransferManager( BlobPath remoteMetadataTransferPath, FileTransferTracker fileTransferTracker, RemoteTranslogTransferTracker remoteTranslogTransferTracker, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + boolean ckpAsMetadata ) { this.shardId = shardId; this.transferService = transferService; @@ -89,6 +93,7 @@ public TranslogTransferManager( this.logger = Loggers.getLogger(getClass(), shardId); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; this.remoteStoreSettings = remoteStoreSettings; + this.ckpAsMetadata = ckpAsMetadata; } public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() { @@ -110,8 +115,12 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); try { - toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); - toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + if (ckpAsMetadata) { + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileWithMetadataSnapshots())); + } else { + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); + toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + } if (toUpload.isEmpty()) { logger.trace("Nothing to upload for transfer"); return true; @@ -236,15 +245,101 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca generation, location ); - // Download Checkpoint file from remote to local FS String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); - // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + if (ckpAsMetadata == false) { + // Download Checkpoint file from remote to local FS + downloadToFS(ckpFileName, location, primaryTerm); + // Download translog file from remote to local FS + downloadToFS(translogFilename, location, primaryTerm); + } else { + // Download translog.tlog file with object metadata from remote to local FS + Map metadata = downloadTranslogToFSAndGetMetadata(translogFilename, location, primaryTerm, generation); + try { + assert metadata != null && !metadata.isEmpty() && metadata.containsKey("ckp-data"); + recoverCkpFileFromMetadata(metadata, location, generation, translogFilename); + } catch (Exception e) { + throw new IOException("Failed to recover checkpoint file from remote", e); + } + } return true; } + private Map downloadTranslogToFSAndGetMetadata(String fileName, Path location, String primaryTerm, String generation) + throws IOException { + Path filePath = location.resolve(fileName); + // Here, we always override the existing file if present. + // We need to change this logic when we introduce incremental download + deleteFileIfExists(filePath); + + boolean downloadStatus = false; + long bytesToRead = 0, downloadStartTime = System.nanoTime(); + Map metadata; + + FetchBlobResult inputStreamWithMetadata = transferService.downloadBlobWithMetadata( + remoteDataTransferPath.add(primaryTerm), + fileName + ); + try { + InputStream inputStream = inputStreamWithMetadata.getInputStream(); + metadata = inputStreamWithMetadata.getMetadata(); + + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + + } finally { + remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); + if (downloadStatus) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); + } + } + + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync + fileTransferTracker.add(fileName, true); + + return metadata; + } + + /** + * Process the provided metadata and tries to write the content of the checkpoint (ckp) file to the FS. + */ + private void recoverCkpFileFromMetadata(Map metadata, Path location, String generation, String fileName) + throws IOException { + + boolean downloadStatus = false; + long bytesToRead = 0; + try { + String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + Path filePath = location.resolve(ckpFileName); + // Here, we always override the existing file if present. + deleteFileIfExists(filePath); + + String ckpDataBase64 = metadata.get("ckp-data"); + if (ckpDataBase64 == null) { + logger.error("Error processing metadata for translog file: {}", fileName); + throw new IllegalStateException( + "Checkpoint file data (key - ckp-data) is expected but not found in metadata for file: " + fileName + ); + } + byte[] ckpFileBytes = Base64.getDecoder().decode(ckpDataBase64); + bytesToRead = ckpFileBytes.length; + + Files.write(filePath, ckpFileBytes); + downloadStatus = true; + } finally { + if (downloadStatus) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); + } + } + } + + public void deleteFileIfExists(Path filePath) throws IOException { + if (Files.exists(filePath)) { + Files.delete(filePath); + } + } + private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. @@ -391,7 +486,11 @@ public void deleteGenerationAsync(long primaryTerm, Set generations, Runna // Add .ckp and .tlog file to translog file list which is located in basePath/ String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFileName = Translog.getFilename(generation); - translogFiles.addAll(List.of(ckpFileName, translogFileName)); + if (ckpAsMetadata == false) { + translogFiles.addAll(List.of(ckpFileName, translogFileName)); + } else { + translogFiles.add(translogFileName); + } }); // Delete the translog and checkpoint files asynchronously deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);