Skip to content

Commit

Permalink
Refactor Translog metadata upload/download to write/read header and f…
Browse files Browse the repository at this point in the history
…ooter via VersionedCodecStreamWrapper (opensearch-project#7953)

Signed-off-by: Bhumika Saini <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
BhumikaSaini-Amazon authored and shiv0408 committed Apr 25, 2024
1 parent 50ede82 commit 0355700
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -61,6 +65,12 @@ public class TranslogTransferManager {
private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";

private static final VersionedCodecStreamWrapper<TranslogTransferMetadata> metadataStreamWrapper = new VersionedCodecStreamWrapper<>(
new TranslogTransferMetadataHandler(),
TranslogTransferMetadata.CURRENT_VERSION,
TranslogTransferMetadata.METADATA_CODEC
);

public TranslogTransferManager(
ShardId shardId,
TransferService transferService,
Expand Down Expand Up @@ -174,9 +184,9 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
return metadataStreamWrapper.readStream(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
Expand All @@ -197,13 +207,40 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
);
TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata();
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));

return new TransferFileSnapshot(
getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()),
translogTransferMetadata.createMetadataBytes(),
getMetadataBytes(translogTransferMetadata),
translogTransferMetadata.getPrimaryTerm()
);
}

/**
* Get the metadata bytes for a {@link TranslogTransferMetadata} object
*
* @param metadata The object to be parsed
* @return Byte representation for the given metadata
*/
public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOException {
byte[] metadataBytes;

try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + metadata.getPrimaryTerm(),
getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()),
output,
TranslogTransferMetadata.BUFFER_SIZE
)
) {
metadataStreamWrapper.writeStream(indexOutput, metadata);
}
metadataBytes = BytesReference.toBytes(output.bytes());
}

return metadataBytes;
}

/**
* This method handles deletion of multiple generations for a single primary term. The deletion happens for translog
* and metadata files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,10 @@

package org.opensearch.index.translog.transfer;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.common.SetOnce;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -44,11 +36,11 @@ public class TranslogTransferMetadata {

public static final String METADATA_SEPARATOR = "__";

private static final int BUFFER_SIZE = 4096;
static final int BUFFER_SIZE = 4096;

private static final int CURRENT_VERSION = 1;
static final int CURRENT_VERSION = 1;

private static final String METADATA_CODEC = "md";
static final String METADATA_CODEC = "md";

public static final Comparator<String> METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator();

Expand All @@ -59,15 +51,6 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans
this.count = count;
}

public TranslogTransferMetadata(IndexInput indexInput) throws IOException {
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, METADATA_CODEC, CURRENT_VERSION, CURRENT_VERSION);
this.primaryTerm = indexInput.readLong();
this.generation = indexInput.readLong();
this.minTranslogGeneration = indexInput.readLong();
this.generationToPrimaryTermMapper.set(indexInput.readMapOfStrings());
}

public long getPrimaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -96,24 +79,6 @@ public static String getFileName(long primaryTerm, long generation) {
return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation)));
}

public byte[] createMetadataBytes() throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + primaryTerm,
getFileName(primaryTerm, generation),
output,
BUFFER_SIZE
)
) {
CodecUtil.writeHeader(indexOutput, METADATA_CODEC, CURRENT_VERSION);
write(indexOutput);
CodecUtil.writeFooter(indexOutput);
}
return BytesReference.toBytes(output.bytes());
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand All @@ -127,17 +92,6 @@ public boolean equals(Object o) {
return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation);
}

private void write(DataOutput out) throws IOException {
out.writeLong(primaryTerm);
out.writeLong(generation);
out.writeLong(minTranslogGeneration);
if (generationToPrimaryTermMapper.get() != null) {
out.writeMapOfStrings(generationToPrimaryTermMapper.get());
} else {
out.writeMapOfStrings(new HashMap<>());
}
}

private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.io.IndexIOStreamHandler;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Handler for {@link TranslogTransferMetadata}
*
* @opensearch.internal
*/
public class TranslogTransferMetadataHandler implements IndexIOStreamHandler<TranslogTransferMetadata> {

/**
* Implements logic to read content from file input stream {@code indexInput} and parse into {@link TranslogTransferMetadata}
*
* @param indexInput file input stream
* @return content parsed to {@link TranslogTransferMetadata}
*/
@Override
public TranslogTransferMetadata readContent(IndexInput indexInput) throws IOException {
long primaryTerm = indexInput.readLong();
long generation = indexInput.readLong();
long minTranslogGeneration = indexInput.readLong();
Map<String, String> generationToPrimaryTermMapper = indexInput.readMapOfStrings();

int count = generationToPrimaryTermMapper.size();
TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count);
metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);

return metadata;
}

/**
* Implements logic to write content from {@code content} to file output stream {@code indexOutput}
*
* @param indexOutput file input stream
* @param content metadata content to be written
*/
@Override
public void writeContent(IndexOutput indexOutput, TranslogTransferMetadata content) throws IOException {
indexOutput.writeLong(content.getPrimaryTerm());
indexOutput.writeLong(content.getGeneration());
indexOutput.writeLong(content.getMinTranslogGeneration());
if (content.getGenerationToPrimaryTermMapper() != null) {
indexOutput.writeMapOfStrings(content.getGenerationToPrimaryTermMapper());
} else {
indexOutput.writeMapOfStrings(new HashMap<>());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void testReadMetadataSingleFile() throws IOException {

TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn(
new ByteArrayInputStream(metadata.createMetadataBytes())
new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata))
);

assertEquals(metadata, translogTransferManager.readMetadata());
Expand All @@ -222,7 +222,7 @@ public void testReadMetadataMultipleFiles() throws IOException {

TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn(
new ByteArrayInputStream(metadata.createMetadataBytes())
new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata))
);

assertEquals(metadata, translogTransferManager.readMetadata());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.junit.Before;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class TranslogTransferMetadataHandlerTests extends OpenSearchTestCase {
private TranslogTransferMetadataHandler handler;

@Before
public void setUp() throws Exception {
super.setUp();
handler = new TranslogTransferMetadataHandler();
}

public void testReadContent() throws IOException {
TranslogTransferMetadata expectedMetadata = getTestMetadata();

// Operation: Read expected metadata from source input stream.
IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes());
TranslogTransferMetadata actualMetadata = handler.readContent(indexInput);

// Verification: Compare actual metadata read from the source input stream.
assertEquals(expectedMetadata, actualMetadata);
}

public void testWriteContent() throws IOException {
TranslogTransferMetadata expectedMetadata = getTestMetadata();

// Operation: Write expected metadata to the target output stream.
BytesStreamOutput output = new BytesStreamOutput();
OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096);
handler.writeContent(actualMetadataStream, expectedMetadata);
actualMetadataStream.close();

// Verification: Compare actual metadata written to the target output stream.
IndexInput indexInput = new ByteArrayIndexInput("metadata file", BytesReference.toBytes(output.bytes()));
long primaryTerm = indexInput.readLong();
long generation = indexInput.readLong();
long minTranslogGeneration = indexInput.readLong();
Map<String, String> generationToPrimaryTermMapper = indexInput.readMapOfStrings();
int count = generationToPrimaryTermMapper.size();
TranslogTransferMetadata actualMetadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count);
actualMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);
assertEquals(expectedMetadata, actualMetadata);
}

private TranslogTransferMetadata getTestMetadata() {
long primaryTerm = 3;
long generation = 500;
long minTranslogGeneration = 300;
Map<String, String> generationToPrimaryTermMapper = new HashMap<>();
generationToPrimaryTermMapper.put("300", "1");
generationToPrimaryTermMapper.put("400", "2");
generationToPrimaryTermMapper.put("500", "3");
int count = generationToPrimaryTermMapper.size();
TranslogTransferMetadata metadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count);
metadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);

return metadata;
}

private byte[] getTestMetadataBytes() throws IOException {
TranslogTransferMetadata metadata = getTestMetadata();

BytesStreamOutput output = new BytesStreamOutput();
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096);
indexOutput.writeLong(metadata.getPrimaryTerm());
indexOutput.writeLong(metadata.getGeneration());
indexOutput.writeLong(metadata.getMinTranslogGeneration());
Map<String, String> generationToPrimaryTermMapper = metadata.getGenerationToPrimaryTermMapper();
indexOutput.writeMapOfStrings(generationToPrimaryTermMapper);
indexOutput.close();

return BytesReference.toBytes(output.bytes());
}
}

0 comments on commit 0355700

Please sign in to comment.