Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for incremental download of translog files #16204

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,67 @@
Files.createDirectories(location);
}

// Delete translog files on local before downloading from remote
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
Map<String, String> generationToChecksumMapper = translogMetadata.getGenerationToChecksumMapper();
long maxGeneration = translogMetadata.getGeneration();
long minGeneration = translogMetadata.getMinTranslogGeneration();

// Delete any translog and checkpoint file which is not part of the current generation range.
for (Path file : FileSystemUtils.files(location)) {
Files.delete(file);
try {
long generation = parseIdFromFileName(file.getFileName().toString(), STRICT_TLOG_OR_CKP_PATTERN);
if (generation < minGeneration || generation > maxGeneration) {
// If the generation is outside the required range, then we delete the same.
Files.delete(file);
}
} catch (IllegalStateException | IllegalArgumentException e) {
// Delete any file which does not conform to Translog or Checkpoint filename patterns.
Files.delete(file);
}
}

Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) {
// For incremental downloads, we will check if the local translog matches the one present in
// remote store. If so, we will skip its download.
for (long i = maxGeneration; i >= minGeneration; i--) {
String generation = Long.toString(i);
String translogFilename = Translog.getFilename(i);
Path targetTranslogPath = location.resolve(translogFilename);

// If we have the translog available for the generation locally, then we need to
// compare the checksum with that in remote obtained via metadata.
// For backward compatibility, we consider the following cases here-
// - Remote metadata does not have the mapping for generation
// - Local translog file lacks the checksum value in footer
// In both these cases, we will download the files for the generation.
if (generationToChecksumMapper.containsKey(generation) && FileSystemUtils.exists(targetTranslogPath)) {
try {
final long expectedChecksum = Long.parseLong(generationToChecksumMapper.get(generation));
final Long actualChecksum = TranslogFooter.readChecksum(targetTranslogPath);

// If the local and remote checksum are same, then continue.
// Else exit the loop and download the translog.
if (actualChecksum != null && actualChecksum == expectedChecksum) {
logger.info(
"Download skipped for translog and checkpoint files for generation={} due to them being locally present",
generation
);

// Mark the translog and checkpoint file as available in the file tracker.
translogTransferManager.markFileAsDownloaded(translogFilename);
translogTransferManager.markFileAsDownloaded(Translog.getCommitCheckpointFileName(i));
continue;
}
} catch (IOException e) {

Check warning on line 307 in server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java#L307

Added line #L307 was not covered by tests
// The exception can occur if the remote translog files were uploaded without footer.
logger.info(

Check warning on line 309 in server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java#L309

Added line #L309 was not covered by tests
"Exception occurred during reconciliation of translog state between local and remote. "
+ "Reverting to downloading the translog and checksum files for generation={}",
generation
);
}

Check warning on line 314 in server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java#L314

Added line #L314 was not covered by tests
}

logger.info("Downloading translog and checkpoint files for generation={}", generation);
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
}
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@
public static final String CHECKPOINT_SUFFIX = ".ckp";
public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX;

// STRICT_TLOG_OR_CKP_PATTERN is the strict pattern for Translog or Checkpoint file.
static final Pattern STRICT_TLOG_OR_CKP_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.ckp|\\.tlog)$");
static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$");
public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID());

Expand Down Expand Up @@ -320,14 +322,18 @@
return parseIdFromFileName(fileName);
}

public static long parseIdFromFileName(String fileName) {
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
public static long parseIdFromFileName(String translogFile) {
return parseIdFromFileName(translogFile, PARSE_STRICT_ID_PATTERN);
}

public static long parseIdFromFileName(String fileName, Pattern pattern) {
final Matcher matcher = pattern.matcher(fileName);
if (matcher.matches()) {
try {
return Long.parseLong(matcher.group(1));
} catch (NumberFormatException e) {
throw new IllegalStateException(
"number formatting issue in a file that passed PARSE_STRICT_ID_PATTERN: " + fileName + "]",
"number formatting issue in a file that passed " + pattern.pattern() + ": " + fileName + "]",

Check warning on line 336 in server/src/main/java/org/opensearch/index/translog/Translog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/Translog.java#L336

Added line #L336 was not covered by tests
e
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.index.translog;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.opensearch.common.io.Channels;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

/**
* Handles the writing and reading of the translog footer, which contains the checksum of the translog data.
*
* Each translog file is structured as follows:
*
* 1. Translog header
* 2. Translog operations
* 3. Translog footer
*
* The footer contains the following information:
*
* - Magic number (int): A constant value that identifies the start of the footer.
* - Algorithm ID (int): The identifier of the checksum algorithm used. Currently, this is always 0.
* - Checksum (long): The checksum of the entire translog data, calculated using the specified algorithm.
*/
public class TranslogFooter {

Check warning on line 62 in server/src/main/java/org/opensearch/index/translog/TranslogFooter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/TranslogFooter.java#L62

Added line #L62 was not covered by tests

/**
* FOOTER_LENGTH is the length of the present footer added to translog files.
* We write 4 bytes for the magic number, 4 bytes for algorithm ID and 8 bytes for the checksum.
* Therefore, the footer length as 16.
* */
private static final int FOOTER_LENGTH = 16;

/**
* Returns the length of the translog footer in bytes.
*
* @return the length of the translog footer in bytes
*/
static int footerLength() {
return FOOTER_LENGTH;
}

/**
* Writes the translog footer to the provided `FileChannel`.
*
* @param channel the `FileChannel` to write the footer to
* @param checksum the checksum value to be written in the footer
* @param toSync whether to force a sync of the written data to the underlying storage
* @return the byte array containing the written footer data
* @throws IOException if an I/O error occurs while writing the footer
*/
static byte[] write(FileChannel channel, long checksum, boolean toSync) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final OutputStreamDataOutput out = new OutputStreamDataOutput(new OutputStreamStreamOutput(byteArrayOutputStream));

CodecUtil.writeBEInt(out, CodecUtil.FOOTER_MAGIC);
CodecUtil.writeBEInt(out, 0);
CodecUtil.writeBELong(out, checksum);

Channels.writeToChannel(byteArrayOutputStream.toByteArray(), channel);
if (toSync) {
channel.force(false);
}

return byteArrayOutputStream.toByteArray();
}

/**
* Reads the checksum value from the footer of the translog file located at the provided `Path`.
*
* If the translog file is of an older version and does not have a footer, this method returns `null`.
*
* @param path the `Path` to the translog file
* @return the checksum value from the translog footer, or `null` if the footer is not present
* @throws IOException if an I/O error occurs while reading the footer
*/
static Long readChecksum(Path path) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer non-static functions? Are they getting invoked from a static context?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this method is invoked from static contexts in all instances.
downloadOnce in RemoteFSTranslog is one instance and open in TranslogReader is another one.

try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
// Read the header and find out if the footer is supported.
final TranslogHeader header = TranslogHeader.read(path, channel);
if (header.getTranslogHeaderVersion() < TranslogHeader.VERSION_WITH_FOOTER) {
return null;

Check warning on line 119 in server/src/main/java/org/opensearch/index/translog/TranslogFooter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/TranslogFooter.java#L119

Added line #L119 was not covered by tests
}

// Read the footer.
final long fileSize = channel.size();
final long footerStart = fileSize - TranslogFooter.footerLength();
ByteBuffer footer = ByteBuffer.allocate(TranslogFooter.footerLength());
int bytesRead = Channels.readFromFileChannel(channel, footerStart, footer);
if (bytesRead != TranslogFooter.footerLength()) {
throw new IOException(
"Read " + bytesRead + " bytes from footer instead of expected " + TranslogFooter.footerLength() + " bytes"

Check warning on line 129 in server/src/main/java/org/opensearch/index/translog/TranslogFooter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/TranslogFooter.java#L128-L129

Added lines #L128 - L129 were not covered by tests
);
}
footer.flip();

// Validate the footer and return the checksum.
int magic = footer.getInt();
if (magic != CodecUtil.FOOTER_MAGIC) {
throw new IOException("Invalid footer magic number: " + magic);
}

int algorithmId = footer.getInt();
if (algorithmId != 0) {
throw new IOException("Unsupported checksum algorithm ID: " + algorithmId);

Check warning on line 142 in server/src/main/java/org/opensearch/index/translog/TranslogFooter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/TranslogFooter.java#L142

Added line #L142 was not covered by tests
}

return footer.getLong();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public final class TranslogHeader {
public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported
public static final int VERSION_CHECKPOINTS = 2; // added checkpoints
public static final int VERSION_PRIMARY_TERM = 3; // added primary term
public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM;
public static final int VERSION_WITH_FOOTER = 4; // added the footer for the translog
public static final int CURRENT_VERSION = VERSION_WITH_FOOTER;

private final String translogUUID;
private final long primaryTerm;
private final int headerSizeInBytes;
private final int translogHeaderVersion;

/**
* Creates a new translog header with the given uuid and primary term.
Expand All @@ -80,14 +82,15 @@ public final class TranslogHeader {
* All operations' terms in this translog file are enforced to be at most this term.
*/
TranslogHeader(String translogUUID, long primaryTerm) {
this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID));
this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID), CURRENT_VERSION);
assert primaryTerm >= 0 : "Primary term must be non-negative; term [" + primaryTerm + "]";
}

private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes) {
private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes, int headerVersion) {
this.translogUUID = translogUUID;
this.primaryTerm = primaryTerm;
this.headerSizeInBytes = headerSizeInBytes;
this.translogHeaderVersion = headerVersion;
}

public String getTranslogUUID() {
Expand All @@ -110,6 +113,13 @@ public int sizeInBytes() {
return headerSizeInBytes;
}

/**
* Returns the version of the translog header.
* */
public int getTranslogHeaderVersion() {
return translogHeaderVersion;
}

static int headerSizeInBytes(String translogUUID) {
return headerSizeInBytes(CURRENT_VERSION, new BytesRef(translogUUID).length);
}
Expand All @@ -127,7 +137,7 @@ private static int headerSizeInBytes(int version, int uuidLength) {
static int readHeaderVersion(final Path path, final FileChannel channel, final StreamInput in) throws IOException {
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, CURRENT_VERSION);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
Expand Down Expand Up @@ -183,7 +193,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th
in.read(uuid.bytes, uuid.offset, uuid.length);
// Read the primary term
final long primaryTerm;
if (version == VERSION_PRIMARY_TERM) {
if (version >= VERSION_PRIMARY_TERM) {
primaryTerm = in.readLong();
} else {
assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]";
Expand All @@ -202,7 +212,7 @@ public static TranslogHeader read(final Path path, final FileChannel channel) th
+ channel.position()
+ "]";

return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes);
return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes, version);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
}
Expand Down
Loading
Loading