Skip to content

Commit

Permalink
added support for incremental download of translog files
Browse files Browse the repository at this point in the history
Presently, the download workflow for remote backed storage works in a manner which causes the download for same translog files multiple times, each time deleting all the older files before downloading them again. This causes significant wasted network bandwidth, along with the time taken for the shard to become active.

This change adds support for downloading the translog files incrementally and omitting the same if they are present locally.

Signed-off-by: Harsh Rawat <[email protected]>
  • Loading branch information
Harsh Rawat committed Oct 15, 2024
1 parent 35c366d commit c8b537c
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,67 @@ private static void downloadOnce(
Files.createDirectories(location);
}

// Delete translog files on local before downloading from remote
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
Map<String, String> generationToChecksumMapper = translogMetadata.getGenerationToChecksumMapper();
long maxGeneration = translogMetadata.getGeneration();
long minGeneration = translogMetadata.getMinTranslogGeneration();

// Delete any translog and checkpoint file which is not part of the current generation range.
for (Path file : FileSystemUtils.files(location)) {
Files.delete(file);
try {
long generation = parseIdFromFileName(file.getFileName().toString(), STRICT_TLOG_OR_CKP_PATTERN);
if (generation < minGeneration || generation > maxGeneration) {
// If the generation is outside the required range, then we delete the same.
Files.delete(file);
}
} catch (IllegalStateException | IllegalArgumentException e) {
// Delete any file which does not conform to Translog or Checkpoint filename patterns.
Files.delete(file);
}
}

Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) {
// For incremental downloads, we will check if the local translog matches the one present in
// remote store. If so, we will skip its download.
for (long i = maxGeneration; i >= minGeneration; i--) {
String generation = Long.toString(i);
String translogFilename = Translog.getFilename(i);
Path targetTranslogPath = location.resolve(translogFilename);

// If we have the translog available for the generation locally, then we need to
// compare the checksum with that in remote obtained via metadata.
// For backward compatibility, we consider the following cases here-
// - Remote metadata does not have the mapping for generation
// - Local translog file lacks the checksum value in footer
// In both these cases, we will download the files for the generation.
if (generationToChecksumMapper.containsKey(generation) && FileSystemUtils.exists(targetTranslogPath)) {
try {
final long expectedChecksum = Long.parseLong(generationToChecksumMapper.get(generation));
final Long actualChecksum = TranslogFooter.readChecksum(targetTranslogPath);

// If the local and remote checksum are same, then continue.
// Else exit the loop and download the translog.
if (actualChecksum != null && actualChecksum == expectedChecksum) {
logger.info(
"Download skipped for translog and checkpoint files for generation={} due to them being locally present",
generation
);

// Mark the translog and checkpoint file as available in the file tracker.
translogTransferManager.markFileAsDownloaded(translogFilename);
translogTransferManager.markFileAsDownloaded(Translog.getCommitCheckpointFileName(i));
continue;
}
} catch (IOException e) {
// The exception can occur if the remote translog files were uploaded without footer.
logger.info(
"Exception occurred during reconciliation of translog state between local and remote. "
+ "Reverting to downloading the translog and checksum files for generation={}",
generation
);
}
}

logger.info("Downloading translog and checkpoint files for generation={}", generation);
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
}
logger.info(
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public abstract class Translog extends AbstractIndexShardComponent implements In
public static final String CHECKPOINT_SUFFIX = ".ckp";
public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX;

// STRICT_TLOG_OR_CKP_PATTERN is the strict pattern for Translog or Checkpoint file.
static final Pattern STRICT_TLOG_OR_CKP_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.ckp|\\.tlog)$");
static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID());

Expand Down Expand Up @@ -320,14 +322,18 @@ public static long parseIdFromFileName(Path translogFile) {
return parseIdFromFileName(fileName);
}

public static long parseIdFromFileName(String fileName) {
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
public static long parseIdFromFileName(String translogFile) {
return parseIdFromFileName(translogFile, PARSE_STRICT_ID_PATTERN);
}

public static long parseIdFromFileName(String fileName, Pattern pattern) {
final Matcher matcher = pattern.matcher(fileName);
if (matcher.matches()) {
try {
return Long.parseLong(matcher.group(1));
} catch (NumberFormatException e) {
throw new IllegalStateException(
"number formatting issue in a file that passed PARSE_STRICT_ID_PATTERN: " + fileName + "]",
"number formatting issue in a file that passed " + pattern.pattern() + ": " + fileName + "]",
e
);
}
Expand Down
148 changes: 148 additions & 0 deletions server/src/main/java/org/opensearch/index/translog/TranslogFooter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.index.translog;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.opensearch.common.io.Channels;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

/**
* Handles the writing and reading of the translog footer, which contains the checksum of the translog data.
*
* Each translog file is structured as follows:
*
* 1. Translog header
* 2. Translog operations
* 3. Translog footer
*
* The footer contains the following information:
*
* - Magic number (int): A constant value that identifies the start of the footer.
* - Algorithm ID (int): The identifier of the checksum algorithm used. Currently, this is always 0.
* - Checksum (long): The checksum of the entire translog data, calculated using the specified algorithm.
*/
public class TranslogFooter {

/**
* FOOTER_LENGTH is the length of the present footer added to translog files.
* We write 4 bytes for the magic number, 4 bytes for algorithm ID and 8 bytes for the checksum.
* Therefore, the footer length as 16.
* */
private static final int FOOTER_LENGTH = 16;

/**
* Returns the length of the translog footer in bytes.
*
* @return the length of the translog footer in bytes
*/
static int footerLength() {
return FOOTER_LENGTH;
}

/**
* Writes the translog footer to the provided `FileChannel`.
*
* @param channel the `FileChannel` to write the footer to
* @param checksum the checksum value to be written in the footer
* @param toSync whether to force a sync of the written data to the underlying storage
* @return the byte array containing the written footer data
* @throws IOException if an I/O error occurs while writing the footer
*/
static byte[] write(FileChannel channel, long checksum, boolean toSync) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final OutputStreamDataOutput out = new OutputStreamDataOutput(new OutputStreamStreamOutput(byteArrayOutputStream));

CodecUtil.writeBEInt(out, CodecUtil.FOOTER_MAGIC);
CodecUtil.writeBEInt(out, 0);
CodecUtil.writeBELong(out, checksum);

Channels.writeToChannel(byteArrayOutputStream.toByteArray(), channel);
if (toSync) {
channel.force(false);
}

return byteArrayOutputStream.toByteArray();
}

/**
* Reads the checksum value from the footer of the translog file located at the provided `Path`.
*
* If the translog file is of an older version and does not have a footer, this method returns `null`.
*
* @param path the `Path` to the translog file
* @return the checksum value from the translog footer, or `null` if the footer is not present
* @throws IOException if an I/O error occurs while reading the footer
*/
static Long readChecksum(Path path) throws IOException {
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
// Read the header and find out if the footer is supported.
final TranslogHeader header = TranslogHeader.read(path, channel);
if (header.getTranslogHeaderVersion() < TranslogHeader.VERSION_WITH_FOOTER) {
return null;
}

// Read the footer.
final long fileSize = channel.size();
final long footerStart = fileSize - TranslogFooter.footerLength();
ByteBuffer footer = ByteBuffer.allocate(TranslogFooter.footerLength());
int bytesRead = Channels.readFromFileChannel(channel, footerStart, footer);
if (bytesRead != TranslogFooter.footerLength()) {
throw new IOException(
"Read " + bytesRead + " bytes from footer instead of expected " + TranslogFooter.footerLength() + " bytes"
);
}
footer.flip();

// Validate the footer and return the checksum.
int magic = footer.getInt();
if (magic != CodecUtil.FOOTER_MAGIC) {
throw new IOException("Invalid footer magic number: " + magic);
}

int algorithmId = footer.getInt();
if (algorithmId != 0) {
throw new IOException("Unsupported checksum algorithm ID: " + algorithmId);
}

return footer.getLong();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public final class TranslogHeader {
public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported
public static final int VERSION_CHECKPOINTS = 2; // added checkpoints
public static final int VERSION_PRIMARY_TERM = 3; // added primary term
public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM;
public static final int VERSION_WITH_FOOTER = 4; // added the footer for the translog
public static final int CURRENT_VERSION = VERSION_WITH_FOOTER;

private final String translogUUID;
private final long primaryTerm;
private final int headerSizeInBytes;
private final int translogHeaderVersion;

/**
* Creates a new translog header with the given uuid and primary term.
Expand All @@ -80,14 +82,15 @@ public final class TranslogHeader {
* All operations' terms in this translog file are enforced to be at most this term.
*/
TranslogHeader(String translogUUID, long primaryTerm) {
this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID));
this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID), CURRENT_VERSION);
assert primaryTerm >= 0 : "Primary term must be non-negative; term [" + primaryTerm + "]";
}

private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes) {
private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes, int headerVersion) {
this.translogUUID = translogUUID;
this.primaryTerm = primaryTerm;
this.headerSizeInBytes = headerSizeInBytes;
this.translogHeaderVersion = headerVersion;
}

public String getTranslogUUID() {
Expand All @@ -110,6 +113,13 @@ public int sizeInBytes() {
return headerSizeInBytes;
}

/**
* Returns the version of the translog header.
* */
public int getTranslogHeaderVersion() {
return translogHeaderVersion;
}

static int headerSizeInBytes(String translogUUID) {
return headerSizeInBytes(CURRENT_VERSION, new BytesRef(translogUUID).length);
}
Expand All @@ -127,7 +137,7 @@ private static int headerSizeInBytes(int version, int uuidLength) {
static int readHeaderVersion(final Path path, final FileChannel channel, final StreamInput in) throws IOException {
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, CURRENT_VERSION);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
Expand Down Expand Up @@ -183,7 +193,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th
in.read(uuid.bytes, uuid.offset, uuid.length);
// Read the primary term
final long primaryTerm;
if (version == VERSION_PRIMARY_TERM) {
if (version >= VERSION_PRIMARY_TERM) {
primaryTerm = in.readLong();
} else {
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
Expand All @@ -202,7 +212,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th
+ channel.position()
+ "]";

return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes);
return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes, version);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
}
Expand Down
Loading

0 comments on commit c8b537c

Please sign in to comment.