diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index bdba311046df5..195e9445a8b7e 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.DataOutput; @@ -19,12 +20,14 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.io.Channels; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; @@ -75,6 +78,7 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -110,6 +114,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("ExtrasFS") @@ -1679,6 +1685,24 @@ public void testCloseIntoReader() throws IOException { final int value = buffer.getInt(); assertEquals(i, value); } + + // Try to read into the footer which would lead into EOF exception. + assertThrowsReadingIntoFooter(reader, numOps, 4); + // Try to read beyond the footer which would lead into EOF exception. + assertThrowsReadingIntoFooter(reader, numOps, 18); + + // Read next 16 bytes directly from the file, which should be the footer. + // This is because for this test, we create a writer which would automatically + // create one with footer. + long translogLengthWithoutFooter = reader.length - TranslogFooter.footerLength(); + ByteBuffer footerBuffer = ByteBuffer.allocate(TranslogFooter.footerLength()); + Channels.readFromFileChannelWithEofException(reader.channel, translogLengthWithoutFooter, footerBuffer); + footerBuffer.flip(); + // Validate the footer. + assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt()); + assertEquals(0, footerBuffer.getInt()); // Algorithm ID + assertEquals(reader.getTranslogChecksum().longValue(), footerBuffer.getLong()); + final Checkpoint readerCheckpoint = reader.getCheckpoint(); assertThat(readerCheckpoint, equalTo(writerCheckpoint)); } finally { @@ -1687,6 +1711,12 @@ public void testCloseIntoReader() throws IOException { } } + // assertThrowsReadingIntoFooter asserts EOF error when we try reading into the Translog footer via reader. + private void assertThrowsReadingIntoFooter(TranslogReader reader, int numOps, int bytesToRead) { + final ByteBuffer buffer = ByteBuffer.allocate(bytesToRead); + assertThrows(EOFException.class, () -> reader.readBytes(buffer, reader.getFirstOperationOffset() + numOps * 4)); + } + public void testDownloadWithRetries() throws IOException { long generation = 1, primaryTerm = 1; Path location = createTempDir(); @@ -1805,6 +1835,110 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException { assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload); } + /** + * createTranslogFile creates a translog file with the given generation and checksum at the provided location. + * */ + private void createTranslogFile(Path location, long generation, long checksum) throws IOException { + Path translogPath = location.resolve(Translog.getFilename(generation)); + Path checkpointPath = location.resolve(Translog.getCommitCheckpointFileName(generation)); + Files.createFile(translogPath); + Files.createFile(checkpointPath); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), generation); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer + TranslogFooter.write(channel, checksum, true); + } + } + + /** + * testIncrementalDownloadWithMatchingChecksum tests the scenario where we have the translog + * file present locally. We test if the download logic for the same skips the download of the file. + * */ + public void testIncrementalDownloadWithMatchingChecksum() throws IOException { + // Set up the test scenario + long generation = 1; + long primaryTerm = 1; + long checksum = 1234; + Path location = createTempDir(); + TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1); + Map generationToPrimaryTermMapper = new HashMap<>(); + Map generationToChecksumMapper = new HashMap<>(); + generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm)); + generationToChecksumMapper.put(String.valueOf(generation), String.valueOf(checksum)); + translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); + + // Mock the transfer manager. + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + // Create a local translog file with the same checksum as the remote + createTranslogFile(location, generation, checksum); + + // Verify that the download is skipped + RemoteFsTranslog.download(mockTransfer, location, logger, false, 0); + verify(mockTransfer, times(0)).downloadTranslog(any(), any(), any()); + } + + /** + * testIncrementalDownloadWithDifferentChecksum tests the case where we have 2 translog generations + * in remote but only 1 present locally. We will download only 1 generation in this case. + * */ + public void testIncrementalDownloadWithDifferentChecksum() throws IOException { + // Set up the test scenario + long generation1 = 1, generation2 = 2, primaryTerm = 1; + long checksum1 = 1234, checksum2 = 5678; + Path location = createTempDir(); + + TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation2, generation1, 2); + Map generationToPrimaryTermMapper = Map.of( + String.valueOf(generation1), + String.valueOf(primaryTerm), + String.valueOf(generation2), + String.valueOf(primaryTerm) + ); + Map generationToChecksumMapper = Map.of( + String.valueOf(generation1), + String.valueOf(checksum1), + String.valueOf(generation2), + String.valueOf(checksum2) + ); + translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); + + // Mock the transfer manager. + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + // Create a local translog file for 1 generation. + createTranslogFile(location, generation1, checksum1); + // Download counter to count the files which were downloaded. + AtomicLong downloadCounter = new AtomicLong(); + // Mock the download of second generation. + doAnswer(invocation -> { + downloadCounter.incrementAndGet(); + Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation2))); + return true; + }).when(mockTransfer).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation2), location); + + // Verify that only generation 2 is downloaded. + RemoteFsTranslog.download(mockTransfer, location, logger, false, 0); + assertEquals(1, downloadCounter.get()); + // verify that generation 1 is not downloaded. + verify(mockTransfer, times(0)).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation1), location); + } + public void testSyncWithGlobalCheckpointUpdate() throws IOException { ArrayList ops = new ArrayList<>(); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java new file mode 100644 index 0000000000000..4d5459ab15c16 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.opensearch.common.UUIDs; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class TranslogFooterTests extends OpenSearchTestCase { + + /** + * testTranslogFooterWrite verifies the functionality of TranslogFooter.write() method + * wherein we write the footer to the translog file. + * */ + public void testTranslogFooterWrite() throws IOException { + Path translogPath = createTempFile(); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer + long expectedChecksum = 0x1234567890ABCDEFL; + byte[] footer = TranslogFooter.write(channel, expectedChecksum, true); + + // Verify the footer contents + ByteBuffer footerBuffer = ByteBuffer.wrap(footer); + assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt()); + assertEquals(0, footerBuffer.getInt()); + assertEquals(expectedChecksum, footerBuffer.getLong()); + + // Verify that the footer was written to the channel + assertEquals(footer.length, channel.size() - (header.sizeInBytes() + operationBytes.length)); + } + } + + /** + * testTranslogFooterReadChecksum verifies the behavior of the TranslogFooter.readChecksum() method, + * which reads the checksum from the footer of a translog file. + * */ + public void testTranslogFooterReadChecksum() throws IOException { + long expectedChecksum = 0x1234567890ABCDEFL; + Path translogPath = createTempFile(); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer. + TranslogFooter.write(channel, expectedChecksum, true); + } + + // Verify that the checksum can be read correctly + Long actualChecksum = TranslogFooter.readChecksum(translogPath); + assert actualChecksum != null; + assertEquals(expectedChecksum, actualChecksum.longValue()); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java index b99479df9c15e..038c77bf3e6fc 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java @@ -29,20 +29,69 @@ public void setUp() throws Exception { handler = new TranslogTransferMetadataHandler(); } + /** + * Tests the readContent method of the TranslogTransferMetadataHandler, which reads the TranslogTransferMetadata + * from the provided IndexInput. + * + * @throws IOException if there is an error reading the metadata from the IndexInput + */ public void testReadContent() throws IOException { TranslogTransferMetadata expectedMetadata = getTestMetadata(); // Operation: Read expected metadata from source input stream. - IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes()); + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes(expectedMetadata)); TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); // Verification: Compare actual metadata read from the source input stream. assertEquals(expectedMetadata, actualMetadata); } + /** + * Tests the readContent method of the TranslogTransferMetadataHandler, which reads the TranslogTransferMetadata + * that includes the generation-to-checksum map from the provided IndexInput. + * + * @throws IOException if there is an error reading the metadata from the IndexInput + */ + public void testReadContentForMetadataWithgenerationToChecksumMap() throws IOException { + TranslogTransferMetadata expectedMetadata = getTestMetadataWithGenerationToChecksumMap(); + + // Operation: Read expected metadata from source input stream. + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes(expectedMetadata)); + TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); + + // Verification: Compare actual metadata read from the source input stream. + assertEquals(expectedMetadata, actualMetadata); + } + + /** + * Tests the writeContent method of the TranslogTransferMetadataHandler, which writes the provided + * TranslogTransferMetadata to the OutputStreamIndexOutput. + * + * @throws IOException if there is an error writing the metadata to the OutputStreamIndexOutput + */ public void testWriteContent() throws IOException { - TranslogTransferMetadata expectedMetadata = getTestMetadata(); + verifyWriteContent(getTestMetadata()); + } + + /** + * Tests the writeContent method of the TranslogTransferMetadataHandler, which writes the provided + * TranslogTransferMetadata that includes the generation-to-checksum map to the OutputStreamIndexOutput. + * + * @throws IOException if there is an error writing the metadata to the OutputStreamIndexOutput + */ + public void testWriteContentWithGeneratonToChecksumMap() throws IOException { + verifyWriteContent(getTestMetadataWithGenerationToChecksumMap()); + } + /** + * Verifies the writeContent method of the TranslogTransferMetadataHandler by writing the provided + * TranslogTransferMetadata to an OutputStreamIndexOutput, and then reading it back and comparing it + * to the original metadata. + * + * @param expectedMetadata the expected TranslogTransferMetadata to be written and verified + * @throws IOException if there is an error writing or reading the metadata + */ + private void verifyWriteContent(TranslogTransferMetadata expectedMetadata) throws IOException { // Operation: Write expected metadata to the target output stream. BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); @@ -55,9 +104,11 @@ public void testWriteContent() throws IOException { long generation = indexInput.readLong(); long minTranslogGeneration = indexInput.readLong(); Map generationToPrimaryTermMapper = indexInput.readMapOfStrings(); + Map generationToChecksumMapper = indexInput.readMapOfStrings(); int count = generationToPrimaryTermMapper.size(); TranslogTransferMetadata actualMetadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); actualMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + actualMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); assertEquals(expectedMetadata, actualMetadata); } @@ -76,18 +127,36 @@ private TranslogTransferMetadata getTestMetadata() { return metadata; } - private byte[] getTestMetadataBytes() throws IOException { + private TranslogTransferMetadata getTestMetadataWithGenerationToChecksumMap() { TranslogTransferMetadata metadata = getTestMetadata(); + Map generationToChecksumMapper = Map.of( + String.valueOf(300), + String.valueOf(1234), + String.valueOf(400), + String.valueOf(4567) + ); + metadata.setGenerationToChecksumMapper(generationToChecksumMapper); + return metadata; + } + /** + * Creates a byte array representation of the provided TranslogTransferMetadata instance, which includes + * the primary term, generation, minimum translog generation, generation-to-primary term mapping, and + * generation-to-checksum mapping (if available). + * + * @param metadata the TranslogTransferMetadata instance to be converted to a byte array + * @return the byte array representation of the TranslogTransferMetadata + * @throws IOException if there is an error writing the metadata to the byte array + */ + private byte[] getTestMetadataBytes(TranslogTransferMetadata metadata) throws IOException { BytesStreamOutput output = new BytesStreamOutput(); - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); - indexOutput.writeLong(metadata.getPrimaryTerm()); - indexOutput.writeLong(metadata.getGeneration()); - indexOutput.writeLong(metadata.getMinTranslogGeneration()); - Map generationToPrimaryTermMapper = metadata.getGenerationToPrimaryTermMapper(); - indexOutput.writeMapOfStrings(generationToPrimaryTermMapper); - indexOutput.close(); - + try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096)) { + indexOutput.writeLong(metadata.getPrimaryTerm()); + indexOutput.writeLong(metadata.getGeneration()); + indexOutput.writeLong(metadata.getMinTranslogGeneration()); + indexOutput.writeMapOfStrings(metadata.getGenerationToPrimaryTermMapper()); + if (metadata.getGenerationToChecksumMapper() != null) indexOutput.writeMapOfStrings(metadata.getGenerationToChecksumMapper()); + } return BytesReference.toBytes(output.bytes()); } }