From 121170316316b431854e7237b51566df6920a3f5 Mon Sep 17 00:00:00 2001 From: Harsh Rawat Date: Wed, 9 Oct 2024 00:26:59 +0530 Subject: [PATCH] added the unit tests for incremental download of translog This commit adds the unit tests applicable for the changes made to support incremental download of translog files. Primarily, the unit tests cover the changes in- - TranslogFooter to write and read the footer of Translog - RemoteFSTranslog to skip the download of locally present translog - TranslogWriter to create reader with checksum upon close - TranslogReader closeIntoReader functionality Signed-off-by: Harsh Rawat --- .../index/translog/RemoteFsTranslogTests.java | 134 ++++++++++++++++++ .../index/translog/TranslogFooterTests.java | 102 +++++++++++++ .../TranslogTransferMetadataHandlerTests.java | 91 ++++++++++-- 3 files changed, 316 insertions(+), 11 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index bdba311046df5..195e9445a8b7e 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.DataOutput; @@ -19,12 +20,14 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.io.Channels; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; @@ -75,6 +78,7 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -110,6 +114,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("ExtrasFS") @@ -1679,6 +1685,24 @@ public void testCloseIntoReader() throws IOException { final int value = buffer.getInt(); assertEquals(i, value); } + + // Try to read into the footer which would lead into EOF exception. + assertThrowsReadingIntoFooter(reader, numOps, 4); + // Try to read beyond the footer which would lead into EOF exception. + assertThrowsReadingIntoFooter(reader, numOps, 18); + + // Read next 16 bytes directly from the file, which should be the footer. + // This is because for this test, we create a writer which would automatically + // create one with footer. + long translogLengthWithoutFooter = reader.length - TranslogFooter.footerLength(); + ByteBuffer footerBuffer = ByteBuffer.allocate(TranslogFooter.footerLength()); + Channels.readFromFileChannelWithEofException(reader.channel, translogLengthWithoutFooter, footerBuffer); + footerBuffer.flip(); + // Validate the footer. + assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt()); + assertEquals(0, footerBuffer.getInt()); // Algorithm ID + assertEquals(reader.getTranslogChecksum().longValue(), footerBuffer.getLong()); + final Checkpoint readerCheckpoint = reader.getCheckpoint(); assertThat(readerCheckpoint, equalTo(writerCheckpoint)); } finally { @@ -1687,6 +1711,12 @@ public void testCloseIntoReader() throws IOException { } } + // assertThrowsReadingIntoFooter asserts EOF error when we try reading into the Translog footer via reader. + private void assertThrowsReadingIntoFooter(TranslogReader reader, int numOps, int bytesToRead) { + final ByteBuffer buffer = ByteBuffer.allocate(bytesToRead); + assertThrows(EOFException.class, () -> reader.readBytes(buffer, reader.getFirstOperationOffset() + numOps * 4)); + } + public void testDownloadWithRetries() throws IOException { long generation = 1, primaryTerm = 1; Path location = createTempDir(); @@ -1805,6 +1835,110 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException { assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload); } + /** + * createTranslogFile creates a translog file with the given generation and checksum at the provided location. + * */ + private void createTranslogFile(Path location, long generation, long checksum) throws IOException { + Path translogPath = location.resolve(Translog.getFilename(generation)); + Path checkpointPath = location.resolve(Translog.getCommitCheckpointFileName(generation)); + Files.createFile(translogPath); + Files.createFile(checkpointPath); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), generation); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer + TranslogFooter.write(channel, checksum, true); + } + } + + /** + * testIncrementalDownloadWithMatchingChecksum tests the scenario where we have the translog + * file present locally. We test if the download logic for the same skips the download of the file. + * */ + public void testIncrementalDownloadWithMatchingChecksum() throws IOException { + // Set up the test scenario + long generation = 1; + long primaryTerm = 1; + long checksum = 1234; + Path location = createTempDir(); + TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1); + Map generationToPrimaryTermMapper = new HashMap<>(); + Map generationToChecksumMapper = new HashMap<>(); + generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm)); + generationToChecksumMapper.put(String.valueOf(generation), String.valueOf(checksum)); + translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); + + // Mock the transfer manager. + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + // Create a local translog file with the same checksum as the remote + createTranslogFile(location, generation, checksum); + + // Verify that the download is skipped + RemoteFsTranslog.download(mockTransfer, location, logger, false, 0); + verify(mockTransfer, times(0)).downloadTranslog(any(), any(), any()); + } + + /** + * testIncrementalDownloadWithDifferentChecksum tests the case where we have 2 translog generations + * in remote but only 1 present locally. We will download only 1 generation in this case. + * */ + public void testIncrementalDownloadWithDifferentChecksum() throws IOException { + // Set up the test scenario + long generation1 = 1, generation2 = 2, primaryTerm = 1; + long checksum1 = 1234, checksum2 = 5678; + Path location = createTempDir(); + + TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation2, generation1, 2); + Map generationToPrimaryTermMapper = Map.of( + String.valueOf(generation1), + String.valueOf(primaryTerm), + String.valueOf(generation2), + String.valueOf(primaryTerm) + ); + Map generationToChecksumMapper = Map.of( + String.valueOf(generation1), + String.valueOf(checksum1), + String.valueOf(generation2), + String.valueOf(checksum2) + ); + translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + translogTransferMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); + + // Mock the transfer manager. + TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class); + when(mockTransfer.readMetadata(0)).thenReturn(translogTransferMetadata); + when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker); + + // Create a local translog file for 1 generation. + createTranslogFile(location, generation1, checksum1); + // Download counter to count the files which were downloaded. + AtomicLong downloadCounter = new AtomicLong(); + // Mock the download of second generation. + doAnswer(invocation -> { + downloadCounter.incrementAndGet(); + Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation2))); + return true; + }).when(mockTransfer).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation2), location); + + // Verify that only generation 2 is downloaded. + RemoteFsTranslog.download(mockTransfer, location, logger, false, 0); + assertEquals(1, downloadCounter.get()); + // verify that generation 1 is not downloaded. + verify(mockTransfer, times(0)).downloadTranslog(String.valueOf(primaryTerm), String.valueOf(generation1), location); + } + public void testSyncWithGlobalCheckpointUpdate() throws IOException { ArrayList ops = new ArrayList<>(); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java new file mode 100644 index 0000000000000..4d5459ab15c16 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/TranslogFooterTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.opensearch.common.UUIDs; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class TranslogFooterTests extends OpenSearchTestCase { + + /** + * testTranslogFooterWrite verifies the functionality of TranslogFooter.write() method + * wherein we write the footer to the translog file. + * */ + public void testTranslogFooterWrite() throws IOException { + Path translogPath = createTempFile(); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer + long expectedChecksum = 0x1234567890ABCDEFL; + byte[] footer = TranslogFooter.write(channel, expectedChecksum, true); + + // Verify the footer contents + ByteBuffer footerBuffer = ByteBuffer.wrap(footer); + assertEquals(CodecUtil.FOOTER_MAGIC, footerBuffer.getInt()); + assertEquals(0, footerBuffer.getInt()); + assertEquals(expectedChecksum, footerBuffer.getLong()); + + // Verify that the footer was written to the channel + assertEquals(footer.length, channel.size() - (header.sizeInBytes() + operationBytes.length)); + } + } + + /** + * testTranslogFooterReadChecksum verifies the behavior of the TranslogFooter.readChecksum() method, + * which reads the checksum from the footer of a translog file. + * */ + public void testTranslogFooterReadChecksum() throws IOException { + long expectedChecksum = 0x1234567890ABCDEFL; + Path translogPath = createTempFile(); + try (FileChannel channel = FileChannel.open(translogPath, StandardOpenOption.WRITE)) { + // Write a translog header + TranslogHeader header = new TranslogHeader(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + header.write(channel, true); + + // Write some translog operations + byte[] operationBytes = new byte[] { 1, 2, 3, 4 }; + channel.write(ByteBuffer.wrap(operationBytes)); + + // Write the translog footer. + TranslogFooter.write(channel, expectedChecksum, true); + } + + // Verify that the checksum can be read correctly + Long actualChecksum = TranslogFooter.readChecksum(translogPath); + assert actualChecksum != null; + assertEquals(expectedChecksum, actualChecksum.longValue()); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java index b99479df9c15e..038c77bf3e6fc 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerTests.java @@ -29,20 +29,69 @@ public void setUp() throws Exception { handler = new TranslogTransferMetadataHandler(); } + /** + * Tests the readContent method of the TranslogTransferMetadataHandler, which reads the TranslogTransferMetadata + * from the provided IndexInput. + * + * @throws IOException if there is an error reading the metadata from the IndexInput + */ public void testReadContent() throws IOException { TranslogTransferMetadata expectedMetadata = getTestMetadata(); // Operation: Read expected metadata from source input stream. - IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes()); + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes(expectedMetadata)); TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); // Verification: Compare actual metadata read from the source input stream. assertEquals(expectedMetadata, actualMetadata); } + /** + * Tests the readContent method of the TranslogTransferMetadataHandler, which reads the TranslogTransferMetadata + * that includes the generation-to-checksum map from the provided IndexInput. + * + * @throws IOException if there is an error reading the metadata from the IndexInput + */ + public void testReadContentForMetadataWithgenerationToChecksumMap() throws IOException { + TranslogTransferMetadata expectedMetadata = getTestMetadataWithGenerationToChecksumMap(); + + // Operation: Read expected metadata from source input stream. + IndexInput indexInput = new ByteArrayIndexInput("metadata file", getTestMetadataBytes(expectedMetadata)); + TranslogTransferMetadata actualMetadata = handler.readContent(indexInput); + + // Verification: Compare actual metadata read from the source input stream. + assertEquals(expectedMetadata, actualMetadata); + } + + /** + * Tests the writeContent method of the TranslogTransferMetadataHandler, which writes the provided + * TranslogTransferMetadata to the OutputStreamIndexOutput. + * + * @throws IOException if there is an error writing the metadata to the OutputStreamIndexOutput + */ public void testWriteContent() throws IOException { - TranslogTransferMetadata expectedMetadata = getTestMetadata(); + verifyWriteContent(getTestMetadata()); + } + + /** + * Tests the writeContent method of the TranslogTransferMetadataHandler, which writes the provided + * TranslogTransferMetadata that includes the generation-to-checksum map to the OutputStreamIndexOutput. + * + * @throws IOException if there is an error writing the metadata to the OutputStreamIndexOutput + */ + public void testWriteContentWithGeneratonToChecksumMap() throws IOException { + verifyWriteContent(getTestMetadataWithGenerationToChecksumMap()); + } + /** + * Verifies the writeContent method of the TranslogTransferMetadataHandler by writing the provided + * TranslogTransferMetadata to an OutputStreamIndexOutput, and then reading it back and comparing it + * to the original metadata. + * + * @param expectedMetadata the expected TranslogTransferMetadata to be written and verified + * @throws IOException if there is an error writing or reading the metadata + */ + private void verifyWriteContent(TranslogTransferMetadata expectedMetadata) throws IOException { // Operation: Write expected metadata to the target output stream. BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput actualMetadataStream = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); @@ -55,9 +104,11 @@ public void testWriteContent() throws IOException { long generation = indexInput.readLong(); long minTranslogGeneration = indexInput.readLong(); Map generationToPrimaryTermMapper = indexInput.readMapOfStrings(); + Map generationToChecksumMapper = indexInput.readMapOfStrings(); int count = generationToPrimaryTermMapper.size(); TranslogTransferMetadata actualMetadata = new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, count); actualMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper); + actualMetadata.setGenerationToChecksumMapper(generationToChecksumMapper); assertEquals(expectedMetadata, actualMetadata); } @@ -76,18 +127,36 @@ private TranslogTransferMetadata getTestMetadata() { return metadata; } - private byte[] getTestMetadataBytes() throws IOException { + private TranslogTransferMetadata getTestMetadataWithGenerationToChecksumMap() { TranslogTransferMetadata metadata = getTestMetadata(); + Map generationToChecksumMapper = Map.of( + String.valueOf(300), + String.valueOf(1234), + String.valueOf(400), + String.valueOf(4567) + ); + metadata.setGenerationToChecksumMapper(generationToChecksumMapper); + return metadata; + } + /** + * Creates a byte array representation of the provided TranslogTransferMetadata instance, which includes + * the primary term, generation, minimum translog generation, generation-to-primary term mapping, and + * generation-to-checksum mapping (if available). + * + * @param metadata the TranslogTransferMetadata instance to be converted to a byte array + * @return the byte array representation of the TranslogTransferMetadata + * @throws IOException if there is an error writing the metadata to the byte array + */ + private byte[] getTestMetadataBytes(TranslogTransferMetadata metadata) throws IOException { BytesStreamOutput output = new BytesStreamOutput(); - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); - indexOutput.writeLong(metadata.getPrimaryTerm()); - indexOutput.writeLong(metadata.getGeneration()); - indexOutput.writeLong(metadata.getMinTranslogGeneration()); - Map generationToPrimaryTermMapper = metadata.getGenerationToPrimaryTermMapper(); - indexOutput.writeMapOfStrings(generationToPrimaryTermMapper); - indexOutput.close(); - + try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096)) { + indexOutput.writeLong(metadata.getPrimaryTerm()); + indexOutput.writeLong(metadata.getGeneration()); + indexOutput.writeLong(metadata.getMinTranslogGeneration()); + indexOutput.writeMapOfStrings(metadata.getGenerationToPrimaryTermMapper()); + if (metadata.getGenerationToChecksumMapper() != null) indexOutput.writeMapOfStrings(metadata.getGenerationToChecksumMapper()); + } return BytesReference.toBytes(output.bytes()); } }