diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f5a9ed8ed9362..6e9c62aeadb22 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -254,14 +254,67 @@ private static void downloadOnce( Files.createDirectories(location); } - // Delete translog files on local before downloading from remote + Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); + Map generationToChecksumMapper = translogMetadata.getGenerationToChecksumMapper(); + long maxGeneration = translogMetadata.getGeneration(); + long minGeneration = translogMetadata.getMinTranslogGeneration(); + + // Delete any translog and checkpoint file which is not part of the current generation range. for (Path file : FileSystemUtils.files(location)) { - Files.delete(file); + try { + long generation = parseIdFromFileName(file.getFileName().toString(), STRICT_TLOG_OR_CKP_PATTERN); + if (generation < minGeneration || generation > maxGeneration) { + // If the generation is outside the required range, then we delete the same. + Files.delete(file); + } + } catch (IllegalStateException | IllegalArgumentException e) { + // Delete any file which does not conform to Translog or Checkpoint filename patterns. + Files.delete(file); + } } - Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); - for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { + // For incremental downloads, we will check if the local translog matches the one present in + // remote store. If so, we will skip its download. + for (long i = maxGeneration; i >= minGeneration; i--) { String generation = Long.toString(i); + String translogFilename = Translog.getFilename(i); + Path targetTranslogPath = location.resolve(translogFilename); + + // If we have the translog available for the generation locally, then we need to + // compare the checksum with that in remote obtained via metadata. + // For backward compatibility, we consider the following cases here- + // - Remote metadata does not have the mapping for generation + // - Local translog file lacks the checksum value in footer + // In both these cases, we will download the files for the generation. + if (generationToChecksumMapper.containsKey(generation) && FileSystemUtils.exists(targetTranslogPath)) { + try { + final long expectedChecksum = Long.parseLong(generationToChecksumMapper.get(generation)); + final Long actualChecksum = TranslogFooter.readChecksum(targetTranslogPath); + + // If the local and remote checksum are same, then continue. + // Else exit the loop and download the translog. + if (actualChecksum != null && actualChecksum == expectedChecksum) { + logger.info( + "Download skipped for translog and checkpoint files for generation={} due to them being locally present", + generation + ); + + // Mark the translog and checkpoint file as available in the file tracker. + translogTransferManager.markFileAsDownloaded(translogFilename); + translogTransferManager.markFileAsDownloaded(Translog.getCommitCheckpointFileName(i)); + continue; + } + } catch (IOException e) { + // The exception can occur if the remote translog files were uploaded without footer. + logger.info( + "Exception occurred during reconciliation of translog state between local and remote. " + + "Reverting to downloading the translog and checksum files for generation={}", + generation + ); + } + } + + logger.info("Downloading translog and checkpoint files for generation={}", generation); translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); } logger.info( diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 4b4ceb7444471..c1f7860cf5716 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -134,6 +134,8 @@ public abstract class Translog extends AbstractIndexShardComponent implements In public static final String CHECKPOINT_SUFFIX = ".ckp"; public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX; + // STRICT_TLOG_OR_CKP_PATTERN is the strict pattern for Translog or Checkpoint file. + static final Pattern STRICT_TLOG_OR_CKP_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.ckp|\\.tlog)$"); static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$"); public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID()); @@ -320,14 +322,18 @@ public static long parseIdFromFileName(Path translogFile) { return parseIdFromFileName(fileName); } - public static long parseIdFromFileName(String fileName) { - final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName); + public static long parseIdFromFileName(String translogFile) { + return parseIdFromFileName(translogFile, PARSE_STRICT_ID_PATTERN); + } + + public static long parseIdFromFileName(String fileName, Pattern pattern) { + final Matcher matcher = pattern.matcher(fileName); if (matcher.matches()) { try { return Long.parseLong(matcher.group(1)); } catch (NumberFormatException e) { throw new IllegalStateException( - "number formatting issue in a file that passed PARSE_STRICT_ID_PATTERN: " + fileName + "]", + "number formatting issue in a file that passed " + pattern.pattern() + ": " + fileName + "]", e ); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java b/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java new file mode 100644 index 0000000000000..c4d6af131af0b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.opensearch.common.io.Channels; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * Handles the writing and reading of the translog footer, which contains the checksum of the translog data. + * + * Each translog file is structured as follows: + * + * 1. Translog header + * 2. Translog operations + * 3. Translog footer + * + * The footer contains the following information: + * + * - Magic number (int): A constant value that identifies the start of the footer. + * - Algorithm ID (int): The identifier of the checksum algorithm used. Currently, this is always 0. + * - Checksum (long): The checksum of the entire translog data, calculated using the specified algorithm. + */ +public class TranslogFooter { + + /** + * FOOTER_LENGTH is the length of the present footer added to translog files. + * We write 4 bytes for the magic number, 4 bytes for algorithm ID and 8 bytes for the checksum. + * Therefore, the footer length as 16. + * */ + private static final int FOOTER_LENGTH = 16; + + /** + * Returns the length of the translog footer in bytes. + * + * @return the length of the translog footer in bytes + */ + static int footerLength() { + return FOOTER_LENGTH; + } + + /** + * Writes the translog footer to the provided `FileChannel`. + * + * @param channel the `FileChannel` to write the footer to + * @param checksum the checksum value to be written in the footer + * @param toSync whether to force a sync of the written data to the underlying storage + * @return the byte array containing the written footer data + * @throws IOException if an I/O error occurs while writing the footer + */ + static byte[] write(FileChannel channel, long checksum, boolean toSync) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final OutputStreamDataOutput out = new OutputStreamDataOutput(new OutputStreamStreamOutput(byteArrayOutputStream)); + + CodecUtil.writeBEInt(out, CodecUtil.FOOTER_MAGIC); + CodecUtil.writeBEInt(out, 0); + CodecUtil.writeBELong(out, checksum); + + Channels.writeToChannel(byteArrayOutputStream.toByteArray(), channel); + if (toSync) { + channel.force(false); + } + + return byteArrayOutputStream.toByteArray(); + } + + /** + * Reads the checksum value from the footer of the translog file located at the provided `Path`. + * + * If the translog file is of an older version and does not have a footer, this method returns `null`. + * + * @param path the `Path` to the translog file + * @return the checksum value from the translog footer, or `null` if the footer is not present + * @throws IOException if an I/O error occurs while reading the footer + */ + static Long readChecksum(Path path) throws IOException { + try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) { + // Read the header and find out if the footer is supported. + final TranslogHeader header = TranslogHeader.read(path, channel); + if (header.getTranslogHeaderVersion() < TranslogHeader.VERSION_WITH_FOOTER) { + return null; + } + + // Read the footer. + final long fileSize = channel.size(); + final long footerStart = fileSize - TranslogFooter.footerLength(); + ByteBuffer footer = ByteBuffer.allocate(TranslogFooter.footerLength()); + int bytesRead = Channels.readFromFileChannel(channel, footerStart, footer); + if (bytesRead != TranslogFooter.footerLength()) { + throw new IOException( + "Read " + bytesRead + " bytes from footer instead of expected " + TranslogFooter.footerLength() + " bytes" + ); + } + footer.flip(); + + // Validate the footer and return the checksum. + int magic = footer.getInt(); + if (magic != CodecUtil.FOOTER_MAGIC) { + throw new IOException("Invalid footer magic number: " + magic); + } + + int algorithmId = footer.getInt(); + if (algorithmId != 0) { + throw new IOException("Unsupported checksum algorithm ID: " + algorithmId); + } + + return footer.getLong(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index 66a9fe08d06b5..b2cae8f209f6f 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -65,11 +65,13 @@ public final class TranslogHeader { public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported public static final int VERSION_CHECKPOINTS = 2; // added checkpoints public static final int VERSION_PRIMARY_TERM = 3; // added primary term - public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM; + public static final int VERSION_WITH_FOOTER = 4; // added the footer for the translog + public static final int CURRENT_VERSION = VERSION_WITH_FOOTER; private final String translogUUID; private final long primaryTerm; private final int headerSizeInBytes; + private final int translogHeaderVersion; /** * Creates a new translog header with the given uuid and primary term. @@ -80,14 +82,15 @@ public final class TranslogHeader { * All operations' terms in this translog file are enforced to be at most this term. */ TranslogHeader(String translogUUID, long primaryTerm) { - this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID)); + this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID), CURRENT_VERSION); assert primaryTerm >= 0 : "Primary term must be non-negative; term [" + primaryTerm + "]"; } - private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes) { + private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes, int headerVersion) { this.translogUUID = translogUUID; this.primaryTerm = primaryTerm; this.headerSizeInBytes = headerSizeInBytes; + this.translogHeaderVersion = headerVersion; } public String getTranslogUUID() { @@ -110,6 +113,13 @@ public int sizeInBytes() { return headerSizeInBytes; } + /** + * Returns the version of the translog header. + * */ + public int getTranslogHeaderVersion() { + return translogHeaderVersion; + } + static int headerSizeInBytes(String translogUUID) { return headerSizeInBytes(CURRENT_VERSION, new BytesRef(translogUUID).length); } @@ -127,7 +137,7 @@ private static int headerSizeInBytes(int version, int uuidLength) { static int readHeaderVersion(final Path path, final FileChannel channel, final StreamInput in) throws IOException { final int version; try { - version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM); + version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, CURRENT_VERSION); } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { tryReportOldVersionError(path, channel); throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e); @@ -183,7 +193,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th in.read(uuid.bytes, uuid.offset, uuid.length); // Read the primary term final long primaryTerm; - if (version == VERSION_PRIMARY_TERM) { + if (version >= VERSION_PRIMARY_TERM) { primaryTerm = in.readLong(); } else { assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; @@ -202,7 +212,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th + channel.position() + "]"; - return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes); + return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes, version); } catch (EOFException e) { throw new TranslogCorruptedException(path.toString(), "translog header truncated", e); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java index d590663670b28..b50adc5bdc034 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java @@ -64,8 +64,13 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { @Nullable private final Long translogChecksum; + + // fullTranslogChecksum is the checksum for translog which includes header, content, and footer. + @Nullable + private final Long fullTranslogChecksum; @Nullable private final Long checkpointChecksum; + private final Boolean hasFooter; /** * Create a translog writer against the specified translog file channel. @@ -80,14 +85,18 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { final FileChannel channel, final Path path, final TranslogHeader header, - final Long translogChecksum + final Long translogChecksum, + final Long fullTranslogChecksum, + final Boolean hasFooter ) throws IOException { super(checkpoint.generation, channel, path, header); this.length = checkpoint.offset; this.totalOperations = checkpoint.numOps; this.checkpoint = checkpoint; this.translogChecksum = translogChecksum; + this.fullTranslogChecksum = fullTranslogChecksum; this.checkpointChecksum = (translogChecksum != null) ? calculateCheckpointChecksum(checkpoint, path) : null; + this.hasFooter = hasFooter; } private static Long calculateCheckpointChecksum(Checkpoint checkpoint, Path path) throws IOException { @@ -101,6 +110,14 @@ public Long getTranslogChecksum() { return translogChecksum; } + /** + * getFullTranslogChecksum returns the complete checksum of the translog which includes + * header, content and footer. + * */ + public Long getFullTranslogChecksum() { + return fullTranslogChecksum; + } + public Long getCheckpointChecksum() { return checkpointChecksum; } @@ -118,7 +135,18 @@ public Long getCheckpointChecksum() { public static TranslogReader open(final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException { final TranslogHeader header = TranslogHeader.read(translogUUID, path, channel); - return new TranslogReader(checkpoint, channel, path, header, null); + + // When we open a reader to Translog from a path, we want to fetch the checksum + // as that would be needed later on while creating the metadata map for + // generation to checksum. + Long translogChecksum = null; + try { + translogChecksum = TranslogFooter.readChecksum(path); + } catch (IOException ignored) {} + + boolean hasFooter = translogChecksum != null; + + return new TranslogReader(checkpoint, channel, path, header, translogChecksum, null, hasFooter); } /** @@ -146,9 +174,9 @@ TranslogReader closeIntoTrimmedReader(long aboveSeqNo, ChannelFactory channelFac IOUtils.fsync(checkpointFile.getParent(), true); - newReader = new TranslogReader(newCheckpoint, channel, path, header, translogChecksum); + newReader = new TranslogReader(newCheckpoint, channel, path, header, translogChecksum, fullTranslogChecksum, hasFooter); } else { - newReader = new TranslogReader(checkpoint, channel, path, header, translogChecksum); + newReader = new TranslogReader(checkpoint, channel, path, header, translogChecksum, fullTranslogChecksum, hasFooter); } toCloseOnFailure = null; return newReader; @@ -177,6 +205,23 @@ final public Checkpoint getCheckpoint() { * reads an operation at the given position into the given buffer. */ protected void readBytes(ByteBuffer buffer, long position) throws IOException { + if (hasFooter && header.getTranslogHeaderVersion() == TranslogHeader.VERSION_WITH_FOOTER) { + // Ensure that the read request does not overlap with footer. + long translogLengthWithoutFooter = length - TranslogFooter.footerLength(); + if (position >= translogLengthWithoutFooter && position < length) { + throw new EOFException( + "read requested past last ops into footer. pos [" + position + "] end: [" + translogLengthWithoutFooter + "]" + ); + } + // If we are trying to read beyond the last Ops, we need to return EOF error. + long lastPositionToRead = position + buffer.limit(); + if (lastPositionToRead > translogLengthWithoutFooter) { + throw new EOFException( + "trying to read past last ops into footer. pos [" + lastPositionToRead + "] end: [" + translogLengthWithoutFooter + "]" + ); + } + } + if (position >= length) { throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]"); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index b0c7d51c3e43b..8a9846f29c54d 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -189,12 +189,12 @@ public static TranslogWriter create( checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE); final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm); header.write(channel, !Boolean.TRUE.equals(remoteTranslogEnabled)); - TranslogCheckedContainer translogCheckedContainer = null; - if (Boolean.TRUE.equals(remoteTranslogEnabled)) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - header.write(byteArrayOutputStream); - translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); - } + + // Enable translogCheckedContainer for remote as well as local translog. + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + header.write(byteArrayOutputStream); + TranslogCheckedContainer translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint( header.sizeInBytes(), fileGeneration, @@ -438,8 +438,41 @@ public TranslogReader closeIntoReader() throws IOException { synchronized (syncLock) { try (ReleasableLock toClose = writeLock.acquire()) { synchronized (this) { + Long translogContentChecksum = null; + Long fullTranslogChecksum = null; try { + if (header.getTranslogHeaderVersion() >= TranslogHeader.VERSION_WITH_FOOTER) { + // If we are adding a footer, change the totalOffset. + // This will ensure that footer length is included in the checkpoint. + totalOffset += TranslogFooter.footerLength(); + } + sync(); // sync before we close.. + + if (header.getTranslogHeaderVersion() >= TranslogHeader.VERSION_WITH_FOOTER) { + // Post sync, we will add the footer to the translog. + assert translogCheckedContainer != null : "checksum has not been calculated for the translog"; + // add footer to the translog file. + // The checksum in the footer consists of header + body. + byte[] footer = TranslogFooter.write( + channel, + translogCheckedContainer.getChecksum(), + !Boolean.TRUE.equals(remoteTranslogEnabled) + ); + // Store the checksum without footer. + translogContentChecksum = translogCheckedContainer.getChecksum(); + + // update the checked container for translog to account for the footer. + // This is needed because the checksum from the container will be used for + // comparison during remote store upload. + translogCheckedContainer.updateFromBytes(footer, 0, footer.length); + fullTranslogChecksum = translogCheckedContainer.getChecksum(); + } else { + // If we reach here then it means we are using older header and therefore, no footer. + // So, both translogContentChecksum and fullTranslogChecksum are same. + translogContentChecksum = (translogCheckedContainer != null) ? translogCheckedContainer.getChecksum() : null; + fullTranslogChecksum = translogContentChecksum; + } } catch (final Exception ex) { closeWithTragicEvent(ex); throw ex; @@ -460,7 +493,9 @@ public TranslogReader closeIntoReader() throws IOException { channel, path, header, - (translogCheckedContainer != null) ? translogCheckedContainer.getChecksum() : null + translogContentChecksum, + fullTranslogChecksum, + true ); } else { throw new AlreadyClosedException( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 86f042af0584b..d64ff447c4b8e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -163,16 +163,30 @@ public boolean equals(Object o) { public static final class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; + // translogContentChecksum is the checksum of Translog which does not include footer. + // In contrast, the checksum class variable has the checksum of Translog which includes footer. + private Long translogContentChecksum; public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException { super(path, primaryTerm, checksum); this.generation = generation; + this.translogContentChecksum = checksum; + } + + public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum, Long fullTranslogChecksum) + throws IOException { + this(primaryTerm, generation, path, fullTranslogChecksum); + this.translogContentChecksum = checksum; } public long getGeneration() { return generation; } + public Long getTranslogContentChecksum() { + return translogContentChecksum; + } + @Override public int hashCode() { return Objects.hash(generation, super.hashCode()); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index ae007c0c33e1e..62d435ec418ce 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -168,7 +168,13 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); generations.add(readerGeneration); translogTransferSnapshot.add( - new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum()), + new TranslogFileSnapshot( + readerPrimaryTerm, + readerGeneration, + translogPath, + reader.getTranslogChecksum(), + reader.getFullTranslogChecksum() + ), new CheckpointFileSnapshot( readerPrimaryTerm, checkpointGeneration, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 291218ea47499..ecc3c17541918 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -269,6 +269,15 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca return true; } + /** + * markFileAsDownloaded marks a file as already downloaded in the file transfer tracker. + * This is needed because we want to ensure that even if the file is skipped from being downloaded, + * TranslogTransferManager knows about it and does not re-upload the same to remote store. + * */ + public void markFileAsDownloaded(String filename) { + fileTransferTracker.add(filename, true); + } + /** * Process the provided metadata and tries to recover translog.ckp file to the FS. */ @@ -293,8 +302,8 @@ private void recoverCkpFileUsingMetadata(Map metadata, Path loca private Map downloadToFS(String fileName, Path location, String primaryTerm, boolean withMetadata) throws IOException { Path filePath = location.resolve(fileName); - // Here, we always override the existing file if present. - // We need to change this logic when we introduce incremental download + // downloadToFS method will be called only when we want to download the file. + // Therefore, we delete the file if it exists. deleteFileIfExists(filePath); Map metadata = null; @@ -451,8 +460,23 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) snapshot -> String.valueOf(snapshot.getPrimaryTerm()) ) ); + + // generationChecksumMap is the mapping between the generation and the checksum of associated translog file. + Map generationChecksumMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { + assert s instanceof TranslogFileSnapshot; + return (TranslogFileSnapshot) s; + }) + .filter(snapshot -> snapshot.getTranslogContentChecksum() != null) + .collect( + Collectors.toMap( + snapshot -> String.valueOf(snapshot.getGeneration()), + snapshot -> String.valueOf(snapshot.getTranslogContentChecksum()) + ) + ); + TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + translogTransferMetadata.setGenerationToChecksumMapper(new HashMap<>(generationChecksumMap)); return new TransferFileSnapshot( translogTransferMetadata.getFileName(), diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 7fe3305545085..6ecc731235cbc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -42,6 +42,9 @@ public class TranslogTransferMetadata { private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + // generationToChecksumMapper is the mapping between the generation and the checksum of associated translog file. + private final SetOnce> generationToChecksumMapper = new SetOnce<>(); + public static final String METADATA_SEPARATOR = "__"; public static final String METADATA_PREFIX = "metadata"; @@ -96,6 +99,20 @@ public Map getGenerationToPrimaryTermMapper() { return generationToPrimaryTermMapper.get(); } + /* + * setGenerationToChecksumMapper sets the mapping between the generation and the checksum of associated translog file. + * */ + public void setGenerationToChecksumMapper(Map generationToChecksumMap) { + generationToChecksumMapper.set(generationToChecksumMap); + } + + /* + * getGenerationToChecksumMapper gets the mapping between the generation and the checksum of associated translog file. + * */ + public Map getGenerationToChecksumMapper() { + return generationToChecksumMapper.get(); + } + /* This should be used only at the time of creation. */ diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java index cea7ef8a4e6dd..8df32a6edb819 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java @@ -12,6 +12,7 @@ import org.apache.lucene.store.IndexOutput; import org.opensearch.common.io.IndexIOStreamHandler; +import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -40,6 +41,15 @@ public TranslogTransferMetadata readContent(IndexInput indexInput) throws IOExce TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + // We set the GenerationToChecksumMapper only if it is present in the file. + // Else we initialise it with an empty map. + try { + Map generationToChecksumMapper = indexInput.readMapOfStrings(); + metadata.setGenerationToChecksumMapper(generationToChecksumMapper); + } catch (EOFException ignored) { + metadata.setGenerationToChecksumMapper(Map.of()); + } + return metadata; } @@ -59,5 +69,11 @@ public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata conte } else { indexOutput.writeMapOfStrings(new HashMap<>()); } + // Write the generation to checksum mapping at the end. + if (content.getGenerationToChecksumMapper() != null) { + indexOutput.writeMapOfStrings(content.getGenerationToChecksumMapper()); + } else { + indexOutput.writeMapOfStrings(new HashMap<>()); + } } } diff --git a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java index cae27d5b259c4..985a8edca7bf4 100644 --- a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java @@ -501,9 +501,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(342L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(271L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(287L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -513,7 +513,7 @@ public void testStats() throws IOException { stats.writeTo(out); final TranslogStats copy = new TranslogStats(out.bytes().streamInput()); assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); - assertThat(copy.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(copy.getTranslogSizeInBytes(), equalTo(342L)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); @@ -521,9 +521,9 @@ public void testStats() throws IOException { builder.endObject(); assertEquals( "{\"translog\":{\"operations\":4,\"size_in_bytes\":" - + 326 + + 342 + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" - + 271 + + 287 + ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + ",\"remote_store\":{\"upload\":{" @@ -540,7 +540,7 @@ public void testStats() throws IOException { long lastModifiedAge = System.currentTimeMillis() - translog.getCurrent().getLastModifiedTime(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(342L)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); assertThat(stats.getEarliestLastModifiedAge(), greaterThanOrEqualTo(lastModifiedAge)); @@ -1754,8 +1754,10 @@ public void testCloseIntoReader() throws IOException { writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); } writer.sync(); - final Checkpoint writerCheckpoint = writer.getCheckpoint(); TranslogReader reader = writer.closeIntoReader(); + // We need to find checkpoint only after the reader has been closed. + // This is so that the added footer is taken care of. + final Checkpoint writerCheckpoint = writer.getCheckpoint(); try { if (randomBoolean()) { reader.close(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 03c77a9a83f57..bdba311046df5 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -1663,8 +1663,10 @@ public void testCloseIntoReader() throws IOException { writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); } writer.sync(); - final Checkpoint writerCheckpoint = writer.getCheckpoint(); TranslogReader reader = writer.closeIntoReader(); + // We need to find checkpoint only after the reader has been closed. + // This is so that the added footer is taken care of. + final Checkpoint writerCheckpoint = writer.getCheckpoint(); try { if (randomBoolean()) { reader.close(); @@ -1690,8 +1692,10 @@ public void testDownloadWithRetries() throws IOException { Path location = createTempDir(); TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1); Map generationToPrimaryTermMapper = new HashMap<>(); + Map generationToChecksumMapper = new HashMap<>(); generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm)); translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);