diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f5a9ed8ed9362..df3517b877949 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -254,14 +254,68 @@ private static void downloadOnce( Files.createDirectories(location); } - // Delete translog files on local before downloading from remote + Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); + Map generationToChecksumMapper = translogMetadata.getGenerationToChecksumMapper(); + long maxGeneration = translogMetadata.getGeneration(); + long minGeneration = translogMetadata.getMinTranslogGeneration(); + + // Delete any translog and checkpoint file which is not part of the current generation range. for (Path file : FileSystemUtils.files(location)) { - Files.delete(file); + try { + long generation = parseIdFromFileName(file.getFileName().toString(), STRICT_TLOG_OR_CKP_PATTERN); + if (generation < minGeneration || generation > maxGeneration) { + // If the generation is outside the required range, then we delete the same. + Files.delete(file); + } + } catch (IllegalStateException | IllegalArgumentException e) { + // Delete any file which does not conform to Translog or Checkpoint filename patterns. + Files.delete(file); + } } - Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); - for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { + for (long i = maxGeneration; i >= minGeneration; i--) { String generation = Long.toString(i); + + // For incremental downloads, we will check if the local translog matches the one present in + // remote store. If so, we will skip its download. + String translogFilename = Translog.getFilename(i); + Path targetTranslogPath = location.resolve(translogFilename); + + // If we have the translog available for the generation then we need to + // compare the checksum with that in remote obtained via metadata. + // For backward compatibility, we consider the following cases here- + // - Remote metadata does not have the mapping for generation + // - Local translog file lacks the checksum value in footer + // In both these cases, we will download the files for the generation. + if (generationToChecksumMapper.containsKey(generation) && FileSystemUtils.exists(targetTranslogPath)) { + try { + final long expectedChecksum = Long.parseLong(generationToChecksumMapper.get(generation)); + final Long actualChecksum = TranslogFooter.readChecksum(targetTranslogPath); + + // If the local and remote checksum are same, then continue. + // Else exit the loop and download the translog. + if (actualChecksum != null && actualChecksum == expectedChecksum) { + logger.info( + "Download skipped for translog and checkpoint files for generation={} due to them being locally present", + generation + ); + + // Mark the translog and checkpoint file as downloaded in the file tracker. + translogTransferManager.markFileAsDownloaded(translogFilename); + translogTransferManager.markFileAsDownloaded(Translog.getCommitCheckpointFileName(i)); + continue; + } + } catch (IOException e) { + // The exception can occur if the remote translog files were uploaded without footer. + logger.info( + "Exception occurred during reconciliation of translog state between local and remote. " + + "Reverting to downloading the translog and checksum files for generation={}", + generation + ); + } + } + + logger.info("Downloading translog and checkpoint files for generation={}", generation); translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); } logger.info( diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 4b4ceb7444471..b30ed147d2397 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -134,6 +134,7 @@ public abstract class Translog extends AbstractIndexShardComponent implements In public static final String CHECKPOINT_SUFFIX = ".ckp"; public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX; + static final Pattern STRICT_TLOG_OR_CKP_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.ckp|\\.tlog)$"); static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$"); public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID()); @@ -320,14 +321,18 @@ public static long parseIdFromFileName(Path translogFile) { return parseIdFromFileName(fileName); } - public static long parseIdFromFileName(String fileName) { - final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName); + public static long parseIdFromFileName(String translogFile) { + return parseIdFromFileName(translogFile, PARSE_STRICT_ID_PATTERN); + } + + public static long parseIdFromFileName(String fileName, Pattern pattern) { + final Matcher matcher = pattern.matcher(fileName); if (matcher.matches()) { try { return Long.parseLong(matcher.group(1)); } catch (NumberFormatException e) { throw new IllegalStateException( - "number formatting issue in a file that passed PARSE_STRICT_ID_PATTERN: " + fileName + "]", + "number formatting issue in a file that passed " + pattern.pattern() + ": " + fileName + "]", e ); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java b/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java new file mode 100644 index 0000000000000..bc5ac8286a5ac --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFooter.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.opensearch.common.io.Channels; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * Each translog file is started with a header followed by the translog operations, and ending with a footer. + * The footer encapsulates the checksum of the translog. + * */ +public class TranslogFooter { + + /** + * footerLength returns the length of the footer. + * We are writing 16 bytes and therefore, we return the same. + */ + static int footerLength() { + return 16; + } + + /** + * write write the translog footer which records both checksum and algorithm ID. + * This method is based upon the CodecUtils.writeFooter method. + * This footer can be parsed and read with CodecUtils.readFooter(). + * + * Same as CodecUtils documentation- + * Footer --> Magic,AlgorithmID,Checksum + * Magic --> Uint32. This identifies the start of the footer. It is always -1071082520. + * AlgorithmID --> Uint32. This indicates the checksum algorithm used. Currently this is always 0. + * Checksum --> Uint64. This is the checksum as calculated for the translog. + * */ + static void write(FileChannel channel, long checksum, boolean toSync) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final OutputStreamDataOutput out = new OutputStreamDataOutput(new OutputStreamStreamOutput(byteArrayOutputStream)); + + CodecUtil.writeBEInt(out, CodecUtil.FOOTER_MAGIC); + CodecUtil.writeBEInt(out, 0); + CodecUtil.writeBELong(out, checksum); + + Channels.writeToChannel(byteArrayOutputStream.toByteArray(), channel); + if (toSync) { + channel.force(false); + } + } + + /** + * validate checks if the Translog file has a valid footer. + * */ + static boolean validate(Path path) { + try { + return TranslogFooter.readChecksum(path) != null; + } catch (IOException e) { + return false; + } + } + + /** + * readChecksum reads the translog file from the given location and returns the checksum if present in the footer. + * If the translog file is of older version and the footer is not present, then we return null. + * */ + static Long readChecksum(Path path) throws IOException { + try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) { + // Read the header and find out if the footer is supported. + final TranslogHeader header = TranslogHeader.read(path, channel); + if (header.getTranslogHeaderVersion() < TranslogHeader.VERSION_WITH_FOOTER) { + return null; + } + + // Read the footer. + final long fileSize = channel.size(); + final long footerStart = fileSize - TranslogFooter.footerLength(); + ByteBuffer footer = ByteBuffer.allocate(TranslogFooter.footerLength()); + int bytesRead = Channels.readFromFileChannel(channel, footerStart, footer); + if (bytesRead != TranslogFooter.footerLength()) { + throw new IOException( + "Read " + bytesRead + " bytes from footer instead of expected " + TranslogFooter.footerLength() + " bytes" + ); + } + footer.flip(); + + // Validate the footer and return the checksum. + int magic = footer.getInt(); + if (magic != CodecUtil.FOOTER_MAGIC) { + throw new IOException("Invalid footer magic number: " + magic); + } + + int algorithmId = footer.getInt(); + if (algorithmId != 0) { + throw new IOException("Unsupported checksum algorithm ID: " + algorithmId); + } + + return footer.getLong(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index 66a9fe08d06b5..bd2f07e310c99 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -65,11 +65,13 @@ public final class TranslogHeader { public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported public static final int VERSION_CHECKPOINTS = 2; // added checkpoints public static final int VERSION_PRIMARY_TERM = 3; // added primary term - public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM; + public static final int VERSION_WITH_FOOTER = 4; // added the footer for the translog + public static final int CURRENT_VERSION = VERSION_WITH_FOOTER; private final String translogUUID; private final long primaryTerm; private final int headerSizeInBytes; + private final int translogHeaderVersion; /** * Creates a new translog header with the given uuid and primary term. @@ -80,14 +82,17 @@ public final class TranslogHeader { * All operations' terms in this translog file are enforced to be at most this term. */ TranslogHeader(String translogUUID, long primaryTerm) { - this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID)); + // When we create Header on the fly, we will use the latest current version as that would include the + // checksum as part of the footer. + this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID), CURRENT_VERSION); assert primaryTerm >= 0 : "Primary term must be non-negative; term [" + primaryTerm + "]"; } - private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes) { + private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes, int headerVersion) { this.translogUUID = translogUUID; this.primaryTerm = primaryTerm; this.headerSizeInBytes = headerSizeInBytes; + this.translogHeaderVersion = headerVersion; } public String getTranslogUUID() { @@ -110,6 +115,13 @@ public int sizeInBytes() { return headerSizeInBytes; } + /** + * Returns the version of the translog header. + * */ + public int getTranslogHeaderVersion() { + return translogHeaderVersion; + } + static int headerSizeInBytes(String translogUUID) { return headerSizeInBytes(CURRENT_VERSION, new BytesRef(translogUUID).length); } @@ -127,7 +139,7 @@ private static int headerSizeInBytes(int version, int uuidLength) { static int readHeaderVersion(final Path path, final FileChannel channel, final StreamInput in) throws IOException { final int version; try { - version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM); + version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, CURRENT_VERSION); } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { tryReportOldVersionError(path, channel); throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e); @@ -183,7 +195,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th in.read(uuid.bytes, uuid.offset, uuid.length); // Read the primary term final long primaryTerm; - if (version == VERSION_PRIMARY_TERM) { + if (version >= VERSION_PRIMARY_TERM) { primaryTerm = in.readLong(); } else { assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; @@ -202,7 +214,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th + channel.position() + "]"; - return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes); + return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes, version); } catch (EOFException e) { throw new TranslogCorruptedException(path.toString(), "translog header truncated", e); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java index d590663670b28..e3f993e41c42f 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java @@ -66,6 +66,7 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { private final Long translogChecksum; @Nullable private final Long checkpointChecksum; + private final Boolean hasFooter; /** * Create a translog writer against the specified translog file channel. @@ -80,7 +81,8 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { final FileChannel channel, final Path path, final TranslogHeader header, - final Long translogChecksum + final Long translogChecksum, + final Boolean hasFooter ) throws IOException { super(checkpoint.generation, channel, path, header); this.length = checkpoint.offset; @@ -88,6 +90,7 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { this.checkpoint = checkpoint; this.translogChecksum = translogChecksum; this.checkpointChecksum = (translogChecksum != null) ? calculateCheckpointChecksum(checkpoint, path) : null; + this.hasFooter = hasFooter != null ? hasFooter : TranslogFooter.validate(path); } private static Long calculateCheckpointChecksum(Checkpoint checkpoint, Path path) throws IOException { @@ -118,7 +121,7 @@ public Long getCheckpointChecksum() { public static TranslogReader open(final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException { final TranslogHeader header = TranslogHeader.read(translogUUID, path, channel); - return new TranslogReader(checkpoint, channel, path, header, null); + return new TranslogReader(checkpoint, channel, path, header, null, null); } /** @@ -146,9 +149,9 @@ TranslogReader closeIntoTrimmedReader(long aboveSeqNo, ChannelFactory channelFac IOUtils.fsync(checkpointFile.getParent(), true); - newReader = new TranslogReader(newCheckpoint, channel, path, header, translogChecksum); + newReader = new TranslogReader(newCheckpoint, channel, path, header, translogChecksum, hasFooter); } else { - newReader = new TranslogReader(checkpoint, channel, path, header, translogChecksum); + newReader = new TranslogReader(checkpoint, channel, path, header, translogChecksum, hasFooter); } toCloseOnFailure = null; return newReader; @@ -177,6 +180,23 @@ final public Checkpoint getCheckpoint() { * reads an operation at the given position into the given buffer. */ protected void readBytes(ByteBuffer buffer, long position) throws IOException { + if (hasFooter && header.getTranslogHeaderVersion() == TranslogHeader.VERSION_WITH_FOOTER) { + // Ensure that the read request does not overlap with footer. + long translogLengthWithoutFooter = length - TranslogFooter.footerLength(); + if (position >= translogLengthWithoutFooter && position < length) { + throw new EOFException( + "read requested past last ops into footer. pos [" + position + "] end: [" + translogLengthWithoutFooter + "]" + ); + } + // If we are trying to read beyond the last Ops, we need to return EOF error. + long lastPositionToRead = position + buffer.limit(); + if (lastPositionToRead > translogLengthWithoutFooter) { + throw new EOFException( + "trying to read past last ops into footer. pos [" + lastPositionToRead + "] end: [" + translogLengthWithoutFooter + "]" + ); + } + } + if (position >= length) { throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]"); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index b0c7d51c3e43b..404078dde77e4 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -189,12 +189,12 @@ public static TranslogWriter create( checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE); final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm); header.write(channel, !Boolean.TRUE.equals(remoteTranslogEnabled)); - TranslogCheckedContainer translogCheckedContainer = null; - if (Boolean.TRUE.equals(remoteTranslogEnabled)) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - header.write(byteArrayOutputStream); - translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); - } + + // Enable translogCheckedContainer for remote as well as local translog. + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + header.write(byteArrayOutputStream); + TranslogCheckedContainer translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint( header.sizeInBytes(), fileGeneration, @@ -439,7 +439,23 @@ public TranslogReader closeIntoReader() throws IOException { try (ReleasableLock toClose = writeLock.acquire()) { synchronized (this) { try { + if (header.getTranslogHeaderVersion() >= TranslogHeader.VERSION_WITH_FOOTER) { + // If we are adding a footer, change the totalOffset. + // This will ensure that footer length is included in the checkpoint. + totalOffset += TranslogFooter.footerLength(); + } + sync(); // sync before we close.. + + if (header.getTranslogHeaderVersion() >= TranslogHeader.VERSION_WITH_FOOTER) { + // Post sync, we will add the footer to the translog. + assert translogCheckedContainer != null : "checksum has not been calculated for the translog"; + TranslogFooter.write( + channel, + translogCheckedContainer.getChecksum(), + !Boolean.TRUE.equals(remoteTranslogEnabled) + ); // add footer + } } catch (final Exception ex) { closeWithTragicEvent(ex); throw ex; @@ -460,7 +476,8 @@ public TranslogReader closeIntoReader() throws IOException { channel, path, header, - (translogCheckedContainer != null) ? translogCheckedContainer.getChecksum() : null + (translogCheckedContainer != null) ? translogCheckedContainer.getChecksum() : null, + true ); } else { throw new AlreadyClosedException( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 291218ea47499..a95bf4f6ca45b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -269,6 +269,16 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca return true; } + /** + * markFileAsDownloaded marks a file as already downloaded in the file transfer tracker. + * This is needed because we want to ensure that even if the file is skipped from being downloaded, + * TranslogTransferManager knows about it and does not re-upload the same to remote store. + * */ + public void markFileAsDownloaded(String filename) { + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync. + fileTransferTracker.add(filename, true); + } + /** * Process the provided metadata and tries to recover translog.ckp file to the FS. */ @@ -293,8 +303,8 @@ private void recoverCkpFileUsingMetadata(Map metadata, Path loca private Map downloadToFS(String fileName, Path location, String primaryTerm, boolean withMetadata) throws IOException { Path filePath = location.resolve(fileName); - // Here, we always override the existing file if present. - // We need to change this logic when we introduce incremental download + // downloadToFS method will be called only when we want to download the file. + // Therefore, we delete the file if it exists. deleteFileIfExists(filePath); Map metadata = null; @@ -451,8 +461,20 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) snapshot -> String.valueOf(snapshot.getPrimaryTerm()) ) ); + + // generationChecksumMap is the mapping between the generation and the checksum of associated translog file. + Map generationChecksumMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { + assert s instanceof TranslogFileSnapshot; + return (TranslogFileSnapshot) s; + }) + .filter(snapshot -> snapshot.getChecksum() != null) + .collect( + Collectors.toMap(snapshot -> String.valueOf(snapshot.getGeneration()), snapshot -> String.valueOf(snapshot.getChecksum())) + ); + TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + translogTransferMetadata.setGenerationToChecksumMapper(new HashMap<>(generationChecksumMap)); return new TransferFileSnapshot( translogTransferMetadata.getFileName(), diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 7fe3305545085..6ecc731235cbc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -42,6 +42,9 @@ public class TranslogTransferMetadata { private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + // generationToChecksumMapper is the mapping between the generation and the checksum of associated translog file. + private final SetOnce> generationToChecksumMapper = new SetOnce<>(); + public static final String METADATA_SEPARATOR = "__"; public static final String METADATA_PREFIX = "metadata"; @@ -96,6 +99,20 @@ public Map getGenerationToPrimaryTermMapper() { return generationToPrimaryTermMapper.get(); } + /* + * setGenerationToChecksumMapper sets the mapping between the generation and the checksum of associated translog file. + * */ + public void setGenerationToChecksumMapper(Map generationToChecksumMap) { + generationToChecksumMapper.set(generationToChecksumMap); + } + + /* + * getGenerationToChecksumMapper gets the mapping between the generation and the checksum of associated translog file. + * */ + public Map getGenerationToChecksumMapper() { + return generationToChecksumMapper.get(); + } + /* This should be used only at the time of creation. */ diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java index cea7ef8a4e6dd..ae7127268abe8 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandler.java @@ -40,6 +40,15 @@ public TranslogTransferMetadata readContent(IndexInput indexInput) throws IOExce TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + // We set the GenerationToChecksumMapper only if it is present in the file. + // Else we initialise it with an empty map. + if (indexInput.getFilePointer() < indexInput.length()) { + Map generationToChecksumMapper = indexInput.readMapOfStrings(); + metadata.setGenerationToChecksumMapper(generationToChecksumMapper); + } else { + metadata.setGenerationToChecksumMapper(new HashMap<>()); + } + return metadata; } @@ -59,5 +68,11 @@ public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata conte } else { indexOutput.writeMapOfStrings(new HashMap<>()); } + // Write the generation to checksum mapping at the end. + if (content.getGenerationToChecksumMapper() != null) { + indexOutput.writeMapOfStrings(content.getGenerationToChecksumMapper()); + } else { + indexOutput.writeMapOfStrings(new HashMap<>()); + } } } diff --git a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java index cae27d5b259c4..985a8edca7bf4 100644 --- a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java @@ -501,9 +501,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(342L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(271L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(287L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -513,7 +513,7 @@ public void testStats() throws IOException { stats.writeTo(out); final TranslogStats copy = new TranslogStats(out.bytes().streamInput()); assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); - assertThat(copy.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(copy.getTranslogSizeInBytes(), equalTo(342L)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); @@ -521,9 +521,9 @@ public void testStats() throws IOException { builder.endObject(); assertEquals( "{\"translog\":{\"operations\":4,\"size_in_bytes\":" - + 326 + + 342 + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" - + 271 + + 287 + ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + ",\"remote_store\":{\"upload\":{" @@ -540,7 +540,7 @@ public void testStats() throws IOException { long lastModifiedAge = System.currentTimeMillis() - translog.getCurrent().getLastModifiedTime(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(326L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(342L)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); assertThat(stats.getEarliestLastModifiedAge(), greaterThanOrEqualTo(lastModifiedAge)); @@ -1754,8 +1754,10 @@ public void testCloseIntoReader() throws IOException { writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); } writer.sync(); - final Checkpoint writerCheckpoint = writer.getCheckpoint(); TranslogReader reader = writer.closeIntoReader(); + // We need to find checkpoint only after the reader has been closed. + // This is so that the added footer is taken care of. + final Checkpoint writerCheckpoint = writer.getCheckpoint(); try { if (randomBoolean()) { reader.close(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 03c77a9a83f57..bdba311046df5 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -1663,8 +1663,10 @@ public void testCloseIntoReader() throws IOException { writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); } writer.sync(); - final Checkpoint writerCheckpoint = writer.getCheckpoint(); TranslogReader reader = writer.closeIntoReader(); + // We need to find checkpoint only after the reader has been closed. + // This is so that the added footer is taken care of. + final Checkpoint writerCheckpoint = writer.getCheckpoint(); try { if (randomBoolean()) { reader.close(); @@ -1690,8 +1692,10 @@ public void testDownloadWithRetries() throws IOException { Path location = createTempDir(); TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1); Map generationToPrimaryTermMapper = new HashMap<>(); + Map generationToChecksumMapper = new HashMap<>(); generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm)); translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);