diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 4a65956f50848..1fc64c9313735 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -121,7 +121,7 @@ public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws I this(path, primaryTerm, checksum, null); } - private TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Map metadata) throws IOException { + public TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Map metadata) throws IOException { super(path, metadata); this.primaryTerm = primaryTerm; this.checksum = checksum; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 191af3de4323d..bdb9526a02602 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -12,8 +12,10 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.index.translog.transfer.listener.FileTransferListener; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -120,37 +122,52 @@ public void onSuccess(TranslogCheckpointSnapshot fileSnapshot) { public void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e) { long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L; remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis); - updateTransferStats(fileSnapshot, false); addGeneration(fileSnapshot.getGeneration(), TransferState.FAILED); - if (!ckpAsTranslogMetadata) { + if (ckpAsTranslogMetadata) { + updateTransferStats(fileSnapshot, false); + } else { assert e instanceof TranslogTransferException; TranslogTransferException exception = (TranslogTransferException) e; - Set failedFiles = exception.getFailedFiles(); - Set successFiles = exception.getSuccessFiles(); + Set failedFiles = exception.getFailedFiles(); + Set successFiles = exception.getSuccessFiles(); assert failedFiles.isEmpty() == false; - failedFiles.forEach(failedFile -> add(failedFile.getName(), false)); - successFiles.forEach(successFile -> add(successFile.getName(), true)); + failedFiles.forEach(failedFile -> { + add(failedFile.getName(), false); + long failedBytes = 0; + try { + failedBytes = failedFile.getContentLength(); + } catch (IOException ignore) {} + updateBytesInRemoteTranslogTransferTracker(failedBytes, false); + }); + successFiles.forEach(successFile -> { + add(successFile.getName(), true); + long succededBytes = 0; + try { + succededBytes = successFile.getContentLength(); + } catch (IOException ignore) {} + updateBytesInRemoteTranslogTransferTracker(succededBytes, true); + }); } } private void updateTransferStats(TranslogCheckpointSnapshot fileSnapshot, boolean isSuccess) { Long translogFileBytes = bytesForTlogCkpFileToUpload.get(fileSnapshot.getTranslogFileName()); if (translogFileBytes != null) { - if (isSuccess) { - remoteTranslogTransferTracker.addUploadBytesSucceeded(translogFileBytes); - } else { - remoteTranslogTransferTracker.addUploadBytesFailed(translogFileBytes); - } + updateBytesInRemoteTranslogTransferTracker(translogFileBytes, isSuccess); } Long checkpointFileBytes = bytesForTlogCkpFileToUpload.get(fileSnapshot.getCheckpointFileName()); if (checkpointFileBytes != null) { - if (isSuccess) { - remoteTranslogTransferTracker.addUploadBytesSucceeded(checkpointFileBytes); - } else { - remoteTranslogTransferTracker.addUploadBytesFailed(checkpointFileBytes); - } + updateBytesInRemoteTranslogTransferTracker(checkpointFileBytes, isSuccess); + } + } + + private void updateBytesInRemoteTranslogTransferTracker(long bytes, boolean isSuccess) { + if (isSuccess) { + remoteTranslogTransferTracker.addUploadBytesSucceeded(bytes); + } else { + remoteTranslogTransferTracker.addUploadBytesFailed(bytes); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java index 8a804af02ea1b..b655ac1b47055 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java @@ -10,6 +10,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -17,6 +18,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.mockito.Mockito.anyLong; @@ -30,6 +32,7 @@ public class FileTransferTrackerTests extends OpenSearchTestCase { protected long generation = 5; protected long minTranslogGeneration = 2; FileTransferTracker fileTransferTracker; + FileTransferTracker fileTransferTrackerCkpAsMetadata; RemoteTranslogTransferTracker remoteTranslogTransferTracker; @Override @@ -37,6 +40,7 @@ public void setUp() throws Exception { super.setUp(); remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 20); fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker, false); + fileTransferTrackerCkpAsMetadata = new FileTransferTracker(shardId, remoteTranslogTransferTracker, true); } public void testOnSuccess() throws IOException { @@ -57,7 +61,7 @@ public void testOnSuccess() throws IOException { null, generation ); - Set toUpload = new HashSet<>(2); + Set toUpload = new HashSet<>(); toUpload.add(transferFileSnapshot); fileTransferTracker.recordBytesForFiles(toUpload); remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); @@ -66,6 +70,7 @@ public void testOnSuccess() throws IOException { remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); fileTransferTracker.onSuccess(transferFileSnapshot); assertEquals(fileTransferTracker.allUploadedGeneration().size(), 1); + assertEquals(fileTransferTracker.allUploaded().size(), 2); try { remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); fileTransferTracker.onFailure(transferFileSnapshot, new IOException("random exception")); @@ -76,51 +81,65 @@ public void testOnSuccess() throws IOException { } public void testOnFailure() throws IOException { - Path testFile = createTempFile(); - Path testFile2 = createTempFile(); + Path tlogFile1 = createTempFile(); + Path ckpFile1 = createTempFile(); + Path tlogFile2 = createTempFile(); + Path ckpFile2 = createTempFile(); int fileSize = 128; - Files.write(testFile, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + Files.write(tlogFile1, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + Files.write(ckpFile1, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); - TranslogCheckpointSnapshot transferFileSnapshot = new TranslogCheckpointSnapshot( + Files.write(tlogFile2, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + Files.write(ckpFile2, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + TranslogCheckpointSnapshot translogCheckpointSnapshot1 = new TranslogCheckpointSnapshot( primaryTerm, generation, minTranslogGeneration, - testFile, - testFile2, + tlogFile1, + ckpFile1, null, null, null, generation ); - TranslogCheckpointSnapshot transferFileSnapshot2 = new TranslogCheckpointSnapshot( + TranslogCheckpointSnapshot translogCheckpointSnapshot2 = new TranslogCheckpointSnapshot( primaryTerm, generation + 1, minTranslogGeneration, - testFile, - testFile2, + tlogFile2, + ckpFile2, null, null, null, generation + 1 ); - FileSnapshot.TransferFileSnapshot transferFileSnapshot1 = transferFileSnapshot.getCheckpointFileSnapshot(); - - Set toUpload = new HashSet<>(2); - toUpload.add(transferFileSnapshot); - toUpload.add(transferFileSnapshot2); + TransferFileSnapshot checkpointFileSnapshot1 = translogCheckpointSnapshot1.getCheckpointFileSnapshot(); + TransferFileSnapshot translogFileSnapshot1 = translogCheckpointSnapshot1.getTranslogFileSnapshot(); + Set toUpload = new HashSet<>(); + toUpload.add(translogCheckpointSnapshot1); + toUpload.add(translogCheckpointSnapshot2); fileTransferTracker.recordBytesForFiles(toUpload); - remoteTranslogTransferTracker.addUploadBytesStarted(fileSize); + remoteTranslogTransferTracker.addUploadBytesStarted(fileSize * 4); fileTransferTracker.onFailure( - transferFileSnapshot, - new TranslogTransferException(transferFileSnapshot, new IOException("random exception"), Set.of(transferFileSnapshot1), null) + translogCheckpointSnapshot1, + new TranslogTransferException( + translogCheckpointSnapshot1, + new IOException("random exception"), + Set.of(checkpointFileSnapshot1, translogFileSnapshot1), + null + ) ); - remoteTranslogTransferTracker.addUploadBytesStarted(fileSize); - fileTransferTracker.onSuccess(transferFileSnapshot2); + fileTransferTracker.onSuccess(translogCheckpointSnapshot2); assertEquals(fileTransferTracker.allUploadedGeneration().size(), 1); - remoteTranslogTransferTracker.addUploadBytesStarted(fileSize); - fileTransferTracker.onSuccess(transferFileSnapshot); + assertEquals(fileTransferTracker.allUploaded().size(), 2); + + remoteTranslogTransferTracker.addUploadBytesStarted(fileSize * 2); + fileTransferTracker.onSuccess(translogCheckpointSnapshot1); assertEquals(fileTransferTracker.allUploadedGeneration().size(), 2); - transferFileSnapshot1.close(); + assertEquals(fileTransferTracker.allUploaded().size(), 4); + + checkpointFileSnapshot1.close(); + translogFileSnapshot1.close(); } public void testOnSuccessStatsFailure() throws IOException { @@ -146,7 +165,7 @@ public void testOnSuccessStatsFailure() throws IOException { generation ); - Set toUpload = new HashSet<>(2); + Set toUpload = new HashSet<>(); toUpload.add(transferFileSnapshot); localFileTransferTracker.recordBytesForFiles(toUpload); localRemoteTranslogTransferTracker.addUploadBytesStarted(2 * fileSize); @@ -171,19 +190,197 @@ public void testUploaded() throws IOException { generation ); - Set toUpload = new HashSet<>(2); + Set toUpload = new HashSet<>(); toUpload.add(transferFileSnapshot); fileTransferTracker.recordBytesForFiles(toUpload); remoteTranslogTransferTracker.addUploadBytesStarted(2 * fileSize); fileTransferTracker.onSuccess(transferFileSnapshot); - String fileName = String.valueOf(testFile.getFileName()); + String tlogFileName = String.valueOf(testFile.getFileName()); String ckpFileName = String.valueOf(ckpFile.getFileName()); + assertTrue(fileTransferTracker.uploaded(tlogFileName)); + assertTrue(fileTransferTracker.uploaded(ckpFileName)); assertTrue(fileTransferTracker.translogGenerationUploaded(generation)); assertFalse(fileTransferTracker.translogGenerationUploaded(generation + 2)); + assertFalse(fileTransferTracker.uploaded("random-name")); fileTransferTracker.deleteGenerations(Set.of(generation)); - assertTrue(fileTransferTracker.uploaded(fileName)); + assertFalse(fileTransferTracker.translogGenerationUploaded(generation)); + + fileTransferTracker.delete(List.of(tlogFileName)); + assertFalse(fileTransferTracker.uploaded(tlogFileName)); assertTrue(fileTransferTracker.uploaded(ckpFileName)); + + } + + // FileTransferTracker tests when ckp is stored as translog metadata + + public void testOnSuccess_WhenCkpAsMetadata() throws IOException { + Path testFile = createTempFile(); + Path ckpFile = createTempFile(); + int fileSize = 128; + int ckpFileSize = 100; + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + Files.write(ckpFile, randomByteArrayOfLength(ckpFileSize), StandardOpenOption.APPEND); + TranslogCheckpointSnapshot transferFileSnapshot = new TranslogCheckpointSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + testFile, + ckpFile, + null, + null, + null, + generation + ); + Set toUpload = new HashSet<>(); + toUpload.add(transferFileSnapshot); + fileTransferTrackerCkpAsMetadata.recordBytesForFiles(toUpload); + remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); + fileTransferTrackerCkpAsMetadata.onSuccess(transferFileSnapshot); + // idempotent + remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); + fileTransferTrackerCkpAsMetadata.onSuccess(transferFileSnapshot); + assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGeneration().size(), 1); + assertEquals(fileTransferTrackerCkpAsMetadata.allUploaded().size(), 0); + try { + remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); + fileTransferTrackerCkpAsMetadata.onFailure(transferFileSnapshot, new IOException("random exception")); + fail("failure after succcess invalid"); + } catch (IllegalStateException ex) { + // all good + } + } + + public void testOnFailure_WhenCkpAsMetadata() throws IOException { + Path tlogFile1 = createTempFile(); + Path ckpFile1 = createTempFile(); + Path tlogFile2 = createTempFile(); + Path ckpFile2 = createTempFile(); + int fileSize = 128; + Files.write(tlogFile1, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + Files.write(ckpFile1, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + + Files.write(tlogFile2, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + Files.write(ckpFile2, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + TranslogCheckpointSnapshot translogCheckpointSnapshot1 = new TranslogCheckpointSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + tlogFile1, + ckpFile1, + null, + null, + null, + generation + ); + TranslogCheckpointSnapshot translogCheckpointSnapshot2 = new TranslogCheckpointSnapshot( + primaryTerm, + generation + 1, + minTranslogGeneration, + tlogFile2, + ckpFile2, + null, + null, + null, + generation + 1 + ); + TransferFileSnapshot translogFileSnapshot1 = translogCheckpointSnapshot1.getTranslogFileSnapshot(); + Set toUpload = new HashSet<>(); + toUpload.add(translogCheckpointSnapshot1); + toUpload.add(translogCheckpointSnapshot2); + fileTransferTrackerCkpAsMetadata.recordBytesForFiles(toUpload); + remoteTranslogTransferTracker.addUploadBytesStarted(fileSize * 4); + fileTransferTrackerCkpAsMetadata.onFailure( + translogCheckpointSnapshot1, + new TranslogTransferException( + translogCheckpointSnapshot1, + new IOException("random exception"), + Set.of(translogFileSnapshot1), + null + ) + ); + fileTransferTrackerCkpAsMetadata.onSuccess(translogCheckpointSnapshot2); + assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGeneration().size(), 1); + // fileName based tracker will not be updated + assertEquals(fileTransferTrackerCkpAsMetadata.allUploaded().size(), 0); + + remoteTranslogTransferTracker.addUploadBytesStarted(fileSize * 2); + fileTransferTrackerCkpAsMetadata.onSuccess(translogCheckpointSnapshot1); + assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGeneration().size(), 2); + // fileName based tracker will not be updated + assertEquals(fileTransferTrackerCkpAsMetadata.allUploaded().size(), 0); + + translogFileSnapshot1.close(); + } + + public void testOnSuccessStatsFailure_WhenCkpAsMetadata() throws IOException { + RemoteTranslogTransferTracker localRemoteTranslogTransferTracker = spy(remoteTranslogTransferTracker); + doAnswer((count) -> { throw new NullPointerException("Error while updating stats"); }).when(localRemoteTranslogTransferTracker) + .addUploadBytesSucceeded(anyLong()); + + FileTransferTracker localFileTransferTracker = new FileTransferTracker(shardId, localRemoteTranslogTransferTracker, true); + + Path testFile = createTempFile(); + int fileSize = 128; + Files.write(testFile, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + + TranslogCheckpointSnapshot transferFileSnapshot = new TranslogCheckpointSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + testFile, + testFile, + null, + null, + null, + generation + ); + + Set toUpload = new HashSet<>(); + toUpload.add(transferFileSnapshot); + localFileTransferTracker.recordBytesForFiles(toUpload); + localRemoteTranslogTransferTracker.addUploadBytesStarted(2 * fileSize); + localFileTransferTracker.onSuccess(transferFileSnapshot); + assertEquals(localFileTransferTracker.allUploadedGeneration().size(), 1); + } + + public void testUploaded_WhenCkpAsMetadata() throws IOException { + Path testFile = createTempFile(); + Path ckpFile = createTempFile(); + int fileSize = 128; + Files.write(testFile, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + TranslogCheckpointSnapshot transferFileSnapshot = new TranslogCheckpointSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + testFile, + ckpFile, + null, + null, + null, + generation + ); + + Set toUpload = new HashSet<>(); + toUpload.add(transferFileSnapshot); + fileTransferTrackerCkpAsMetadata.recordBytesForFiles(toUpload); + remoteTranslogTransferTracker.addUploadBytesStarted(2 * fileSize); + fileTransferTrackerCkpAsMetadata.onSuccess(transferFileSnapshot); + String tlogFileName = String.valueOf(testFile.getFileName()); + String ckpFileName = String.valueOf(ckpFile.getFileName()); + // fileName based tracker will not be updated + assertFalse(fileTransferTrackerCkpAsMetadata.uploaded(tlogFileName)); + assertFalse(fileTransferTrackerCkpAsMetadata.uploaded(ckpFileName)); + assertTrue(fileTransferTrackerCkpAsMetadata.translogGenerationUploaded(generation)); + assertFalse(fileTransferTrackerCkpAsMetadata.translogGenerationUploaded(generation + 2)); + assertFalse(fileTransferTrackerCkpAsMetadata.uploaded("random-name")); + + fileTransferTrackerCkpAsMetadata.deleteGenerations(Set.of(generation)); + assertFalse(fileTransferTrackerCkpAsMetadata.translogGenerationUploaded(generation)); + + fileTransferTrackerCkpAsMetadata.delete(List.of(tlogFileName)); + assertFalse(fileTransferTrackerCkpAsMetadata.uploaded(tlogFileName)); + assertFalse(fileTransferTrackerCkpAsMetadata.uploaded(ckpFileName)); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManagerTests.java new file mode 100644 index 0000000000000..6b02f25967016 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManagerTests.java @@ -0,0 +1,536 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.SetOnce; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.InputStreamWithMetadata; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.indices.DefaultRemoteStoreSettings; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@LuceneTestCase.SuppressFileSystems("*") +public class TranslogCkpAsMetadataFileTransferManagerTests extends OpenSearchTestCase { + + private TransferService transferService; + private ShardId shardId; + private BlobPath remoteBaseTransferPath; + private ThreadPool threadPool; + private long primaryTerm; + private long generation; + private long minTranslogGeneration; + private RemoteTranslogTransferTracker remoteTranslogTransferTracker; + byte[] tlogBytes; + byte[] ckpBytes; + FileTransferTracker tracker; + TranslogTransferManager translogCkpAsMetadataTransferManager; + long delayForBlobDownload; + private final boolean ckpAsTranslogMetadata = true; + private TranslogCheckpointSnapshot spyTranslogCheckpointSnapshot1; + private TranslogCheckpointSnapshot spyTranslogCheckpointSnapshot2; + + @Override + public void setUp() throws Exception { + super.setUp(); + primaryTerm = randomNonNegativeLong(); + generation = randomNonNegativeLong(); + shardId = mock(ShardId.class); + when(shardId.getIndex()).thenReturn(new Index("index", "indexUUid")); + minTranslogGeneration = randomLongBetween(0, generation); + remoteBaseTransferPath = new BlobPath().add("base_path"); + transferService = mock(TransferService.class); + threadPool = new TestThreadPool(getClass().getName()); + remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 20); + tlogBytes = "Hello Translog".getBytes(StandardCharsets.UTF_8); + ckpBytes = "Hello Checkpoint".getBytes(StandardCharsets.UTF_8); + tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0), remoteTranslogTransferTracker, ckpAsTranslogMetadata); + translogCkpAsMetadataTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + tracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + ckpAsTranslogMetadata + ); + + Path translogFile1 = createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX); + Path checkpointFile1 = createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX); + + Path translogFile2 = createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX); + Path checkpointFile2 = createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX); + + TranslogCheckpointSnapshot translogCheckpointSnapshot1 = new TranslogCheckpointSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + translogFile1, + checkpointFile1, + null, + null, + null, + generation + ); + + TranslogCheckpointSnapshot translogCheckpointSnapshot2 = new TranslogCheckpointSnapshot( + primaryTerm, + generation - 1, + minTranslogGeneration, + translogFile2, + checkpointFile2, + null, + null, + null, + generation - 1 + ); + + spyTranslogCheckpointSnapshot1 = spy(translogCheckpointSnapshot1); + spyTranslogCheckpointSnapshot2 = spy(translogCheckpointSnapshot2); + + Map metadata = TranslogCheckpointSnapshot.createMetadata(ckpBytes); + TransferFileSnapshot dummyTransferFileSnapshot1 = new TransferFileSnapshot(translogFile1, 1, null, metadata); + TransferFileSnapshot dummyTransferFileSnapshot2 = new TransferFileSnapshot(translogFile2, 1, null, metadata); + doReturn(dummyTransferFileSnapshot1).when(spyTranslogCheckpointSnapshot1).getTranslogFileSnapshotWithMetadata(); + doReturn(dummyTransferFileSnapshot2).when(spyTranslogCheckpointSnapshot2).getTranslogFileSnapshotWithMetadata(); + + delayForBlobDownload = 1; + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new ByteArrayInputStream(tlogBytes); + }); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new ByteArrayInputStream(ckpBytes); + }); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + terminate(threadPool); + } + + @SuppressWarnings("unchecked") + public void testTransferSnapshot() throws Exception { + AtomicInteger fileTransferSucceeded = new AtomicInteger(); + AtomicInteger fileTransferFailed = new AtomicInteger(); + AtomicInteger translogTransferSucceeded = new AtomicInteger(); + AtomicInteger translogTransferFailed = new AtomicInteger(); + + doNothing().when(transferService) + .uploadBlob( + any(TransferFileSnapshot.class), + eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm))), + any(WritePriority.class) + ); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; + transferFileSnapshots.forEach(listener::onResponse); + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + + FileTransferTracker fileTransferTracker = new FileTransferTracker( + new ShardId("index", "indexUUid", 0), + remoteTranslogTransferTracker, + false + ) { + @Override + public void onSuccess(TranslogCheckpointSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + super.onSuccess(fileSnapshot); + } + + @Override + public void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + super.onFailure(fileSnapshot, e); + } + + }; + + TranslogTransferManager translogTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + fileTransferTracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + ckpAsTranslogMetadata + ); + + assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) { + translogTransferSucceeded.incrementAndGet(); + } + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + translogTransferFailed.incrementAndGet(); + } + })); + assertEquals(2, fileTransferSucceeded.get()); + assertEquals(0, fileTransferFailed.get()); + assertEquals(1, translogTransferSucceeded.get()); + assertEquals(0, translogTransferFailed.get()); + assertEquals(2, fileTransferTracker.allUploadedGeneration().size()); + } + + public void testTransferSnapshotOnUploadTimeout() throws Exception { + doAnswer(invocationOnMock -> { + Set transferFileSnapshots = invocationOnMock.getArgument(0); + ActionListener listener = invocationOnMock.getArgument(2); + Runnable runnable = () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (TransferFileSnapshot transferFileSnapshot : transferFileSnapshots) { + listener.onResponse(transferFileSnapshot); + } + }; + Thread t = new Thread(runnable); + t.start(); + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + FileTransferTracker fileTransferTracker = new FileTransferTracker( + new ShardId("index", "indexUUid", 0), + remoteTranslogTransferTracker, + false + ); + RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class); + when(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout()).thenReturn(new TimeValue(1)); + TranslogTransferManager translogTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + fileTransferTracker, + remoteTranslogTransferTracker, + remoteStoreSettings, + ckpAsTranslogMetadata + ); + SetOnce exception = new SetOnce<>(); + translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) {} + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + exception.set(ex); + } + }); + assertNotNull(exception.get()); + assertTrue(exception.get() instanceof TranslogUploadFailedException); + assertEquals("Timed out waiting for transfer of snapshot test-to-string to complete", exception.get().getMessage()); + } + + public void testTransferSnapshotOnThreadInterrupt() throws Exception { + SetOnce uploadThread = new SetOnce<>(); + doAnswer(invocationOnMock -> { + uploadThread.set(new Thread(() -> { + ActionListener listener = invocationOnMock.getArgument(2); + try { + Thread.sleep(31 * 1000); + } catch (InterruptedException ignore) { + List list = new ArrayList<>(invocationOnMock.getArgument(0)); + listener.onFailure(new FileTransferException(list.get(0), ignore)); + } + })); + uploadThread.get().start(); + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + FileTransferTracker fileTransferTracker = new FileTransferTracker( + new ShardId("index", "indexUUid", 0), + remoteTranslogTransferTracker, + false + ); + TranslogTransferManager translogTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + fileTransferTracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + ckpAsTranslogMetadata + ); + SetOnce exception = new SetOnce<>(); + + Thread thread = new Thread(() -> { + try { + translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) {} + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + exception.set(ex); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + + Thread.sleep(1000); + // Interrupt the thread + thread.interrupt(); + assertBusy(() -> { + assertNotNull(exception.get()); + assertTrue(exception.get() instanceof TranslogUploadFailedException); + assertEquals("Failed to upload test-to-string", exception.get().getMessage()); + }); + uploadThread.get().interrupt(); + } + + private TransferSnapshot createTransferSnapshot() { + return new TransferSnapshot() { + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); + } + + @Override + public Set getTranslogCheckpointSnapshots() { + return Set.of(spyTranslogCheckpointSnapshot1, spyTranslogCheckpointSnapshot2); + } + + @Override + public String toString() { + return "test-to-string"; + } + }; + } + + public void testDownloadTranslog_When_CkpFileStoredAsMetadata() throws IOException { + Path location = createTempDir(); + mockDownloadBlobWithMetadataResponse_WithMetadataValueAsCkpData(); + assertFalse(Files.exists(location.resolve("translog-23.tlog"))); + assertFalse(Files.exists(location.resolve("translog-23.ckp"))); + translogCkpAsMetadataTransferManager.downloadTranslog("12", "23", location); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + assertTlogCkpDownloadStats_When_CkpFileStoredAsMetadata(); + } + + public void testDownloadTranslogAlreadyExists_When_CkpFileStoredAsMetadata() throws IOException { + Path location = createTempDir(); + Files.createFile(location.resolve("translog-23.tlog")); + Files.createFile(location.resolve("translog-23.ckp")); + mockDownloadBlobWithMetadataResponse_WithMetadataValueAsCkpData(); + translogCkpAsMetadataTransferManager.downloadTranslog("12", "23", location); + verify(transferService, times(1)).downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog")); + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + assertTlogCkpDownloadStats_When_CkpFileStoredAsMetadata(); + } + + public void testDownloadTranslogWithTrackerUpdated_When_CkpFileStoredAsMetadata() throws IOException { + Path location = createTempDir(); + String translogFile = "translog-23.tlog", checkpointFile = "translog-23.ckp"; + Files.createFile(location.resolve(translogFile)); + Files.createFile(location.resolve(checkpointFile)); + mockDownloadBlobWithMetadataResponse_WithMetadataValueAsCkpData(); + translogCkpAsMetadataTransferManager.downloadTranslog("12", "23", location); + + verify(transferService, times(1)).downloadBlobWithMetadata(any(BlobPath.class), eq(translogFile)); + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq(checkpointFile)); + assertTrue(Files.exists(location.resolve(translogFile))); + assertTrue(Files.exists(location.resolve(checkpointFile))); + + // Since the tracker already holds the translog.tlog file, and generation with success state, adding them with failed state would + // throw exception + assertThrows(IllegalStateException.class, () -> tracker.add(translogFile, false)); + assertThrows(IllegalStateException.class, () -> tracker.addGeneration(23, false)); + + // Since the tracker doesn't have translog.ckp file status updated. adding it Failed is allowed + tracker.add(checkpointFile, false); + + // Since the tracker already holds the translog.tlog file, and generation with success state, adding them with success state is + // allowed + tracker.add(translogFile, true); + tracker.addGeneration(23, true); + assertTlogCkpDownloadStats_When_CkpFileStoredAsMetadata(); + } + + public void mockDownloadBlobWithMetadataResponse_WithMetadataValueAsCkpData() throws IOException { + Map metadata = TranslogCheckpointSnapshot.createMetadata(ckpBytes); + when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new InputStreamWithMetadata(new ByteArrayInputStream(tlogBytes), metadata); + }); + } + + public void testDeleteTranslogSuccess_when_ckpStoredAsMetadata() throws Exception { + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); + TranslogTransferManager translogTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( + shardId, + blobStoreTransferService, + remoteBaseTransferPath.add(TRANSLOG.getName()), + remoteBaseTransferPath.add(METADATA.getName()), + tracker, + remoteTranslogTransferTracker, + DefaultRemoteStoreSettings.INSTANCE, + ckpAsTranslogMetadata + ); + String translogFile = "translog-19.tlog"; + tracker.addGeneration(19, true); + tracker.add(translogFile, true); + // tracker.add(checkpointFile, true); + assertEquals(1, tracker.allUploadedGeneration().size()); + assertEquals(1, tracker.allUploaded().size()); + + List verifyDeleteFilesList = List.of(translogFile); + translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); + assertBusy(() -> assertEquals(0, tracker.allUploadedGeneration().size())); + assertBusy(() -> assertEquals(0, tracker.allUploaded().size())); + // only translog.tlog file will be sent for delete. + verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(verifyDeleteFilesList)); + } + + private void assertTlogCkpDownloadStats_When_CkpFileStoredAsMetadata() { + assertEquals(tlogBytes.length + ckpBytes.length, remoteTranslogTransferTracker.getDownloadBytesSucceeded()); + // Expect delay for both tlog and ckp file + assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= delayForBlobDownload); + } + + public void testTransferTranslogCheckpointSnapshotWithAllFilesUploaded() throws Exception { + // Arrange + Set toUpload = createTestTranslogCheckpointSnapshots(); + Map blobPathMap = new HashMap<>(); + AtomicInteger successGenCount = new AtomicInteger(); + AtomicInteger failedGenCount = new AtomicInteger(); + AtomicInteger processedFilesCount = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + + doAnswer(invocationOnMock -> { + Set transferFileSnapshots = invocationOnMock.getArgument(0); + ActionListener listener = invocationOnMock.getArgument(2); + for (TransferFileSnapshot fileSnapshot : transferFileSnapshots) { + listener.onResponse(fileSnapshot); + assertNotNull(fileSnapshot.getMetadata()); + processedFilesCount.getAndIncrement(); + fileSnapshot.close(); + } + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(resp -> successGenCount.getAndIncrement(), ex -> failedGenCount.getAndIncrement()), + latch + ); + + translogCkpAsMetadataTransferManager.transferTranslogCheckpointSnapshot(toUpload, blobPathMap, listener); + assertEquals(successGenCount.get(), 2); + assertEquals(failedGenCount.get(), 0); + assertEquals(processedFilesCount.get(), 2); + } + + public void testTransferTranslogCheckpointSnapshotWithOneOfTheTwoFilesFailedForATranslogGeneration() throws Exception { + // Arrange + Set toUpload = createTestTranslogCheckpointSnapshots(); + Map blobPathMap = new HashMap<>(); + AtomicInteger successGenCount = new AtomicInteger(); + AtomicInteger failedGenCount = new AtomicInteger(); + AtomicInteger proccessedFilesCount = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + + doAnswer(invocationOnMock -> { + Set transferFileSnapshots = invocationOnMock.getArgument(0); + ActionListener listener = invocationOnMock.getArgument(2); + TransferFileSnapshot fileSnapshot1 = null; + TransferFileSnapshot fileSnapshot2 = null; + Iterator iterator = transferFileSnapshots.iterator(); + if (iterator.hasNext()) { + fileSnapshot1 = iterator.next(); + assertNotNull(fileSnapshot1.getMetadata()); + proccessedFilesCount.getAndIncrement(); + listener.onFailure(new FileTransferException(fileSnapshot1, new Exception("test"))); + fileSnapshot1.close(); + } + if (iterator.hasNext()) { + fileSnapshot2 = iterator.next(); + assertNotNull(fileSnapshot1.getMetadata()); + proccessedFilesCount.getAndIncrement(); + listener.onResponse(fileSnapshot2); + fileSnapshot2.close(); + } + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(resp -> successGenCount.getAndIncrement(), ex -> failedGenCount.getAndIncrement()), + latch + ); + + translogCkpAsMetadataTransferManager.transferTranslogCheckpointSnapshot(toUpload, blobPathMap, listener); + assertEquals(successGenCount.get(), 1); + assertEquals(failedGenCount.get(), 1); + assertEquals(proccessedFilesCount.get(), 2); + } + + private Set createTestTranslogCheckpointSnapshots() { + return Set.of(spyTranslogCheckpointSnapshot1, spyTranslogCheckpointSnapshot2); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BaseTranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManagerTests.java similarity index 77% rename from server/src/test/java/org/opensearch/index/translog/transfer/BaseTranslogTransferManagerTests.java rename to server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManagerTests.java index af85e4e9e9f08..f88d9f2d308b9 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BaseTranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManagerTests.java @@ -64,12 +64,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("*") -public class BaseTranslogTransferManagerTests extends OpenSearchTestCase { +public class TranslogCkpFilesTransferManagerTests extends OpenSearchTestCase { private TransferService transferService; private ShardId shardId; @@ -82,8 +81,7 @@ public class BaseTranslogTransferManagerTests extends OpenSearchTestCase { byte[] tlogBytes; byte[] ckpBytes; FileTransferTracker tracker; - TranslogTransferManager translogTransferManager; - TranslogTransferManager translogTransferManager2; + TranslogTransferManager translogCkpFilesTransferManager; long delayForBlobDownload; private final boolean ckpAsTranslogMetadata = false; @@ -101,8 +99,8 @@ public void setUp() throws Exception { remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 20); tlogBytes = "Hello Translog".getBytes(StandardCharsets.UTF_8); ckpBytes = "Hello Checkpoint".getBytes(StandardCharsets.UTF_8); - tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0), remoteTranslogTransferTracker, false); - translogTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( + tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0), remoteTranslogTransferTracker, ckpAsTranslogMetadata); + translogCkpFilesTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( shardId, transferService, remoteBaseTransferPath.add(TRANSLOG.getName()), @@ -113,17 +111,6 @@ public void setUp() throws Exception { ckpAsTranslogMetadata ); - translogTransferManager2 = TranslogTransferManagerFactory.getTranslogTransferManager( - shardId, - transferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - tracker, - remoteTranslogTransferTracker, - DefaultRemoteStoreSettings.INSTANCE, - true - ); - delayForBlobDownload = 1; when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { Thread.sleep(delayForBlobDownload); @@ -261,7 +248,6 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { } public void testTransferSnapshotOnThreadInterrupt() throws Exception { - List uploadThreadList = new ArrayList<>(); doAnswer(invocationOnMock -> { SetOnce uploadThread = new SetOnce<>(); @@ -378,7 +364,7 @@ public void testReadMetadataNoFile() throws IOException { }).when(transferService) .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); - assertNull(translogTransferManager.readMetadata()); + assertNull(translogCkpFilesTransferManager.readMetadata()); assertNoDownloadStats(false); } @@ -403,12 +389,15 @@ public void testReadMetadataFile() throws IOException { long delayForMdDownload = 1; when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename1))).thenAnswer(invocation -> { Thread.sleep(delayForMdDownload); - return new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)); + return new ByteArrayInputStream(translogCkpFilesTransferManager.getMetadataBytes(metadata)); }); - assertEquals(metadata, translogTransferManager.readMetadata()); + assertEquals(metadata, translogCkpFilesTransferManager.readMetadata()); - assertEquals(translogTransferManager.getMetadataBytes(metadata).length, remoteTranslogTransferTracker.getDownloadBytesSucceeded()); + assertEquals( + translogCkpFilesTransferManager.getMetadataBytes(metadata).length, + remoteTranslogTransferTracker.getDownloadBytesSucceeded() + ); assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= delayForMdDownload); } @@ -427,7 +416,7 @@ public void testReadMetadataReadException() throws IOException { when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); - assertThrows(IOException.class, translogTransferManager::readMetadata); + assertThrows(IOException.class, translogCkpFilesTransferManager::readMetadata); assertNoDownloadStats(true); } @@ -451,53 +440,27 @@ public void testReadMetadataListException() throws IOException { when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong")); - assertThrows(IOException.class, translogTransferManager::readMetadata); + assertThrows(IOException.class, translogCkpFilesTransferManager::readMetadata); assertNoDownloadStats(false); } public void testDownloadTranslog() throws IOException { Path location = createTempDir(); - mockResponseDownloadBlobWithMetadata_WithNULLMetadata(); + mockDownloadBlobWithMetadataResponse_WithNULLMetadataValue(); assertFalse(Files.exists(location.resolve("translog-23.tlog"))); assertFalse(Files.exists(location.resolve("translog-23.ckp"))); - translogTransferManager.downloadTranslog("12", "23", location); + translogCkpFilesTransferManager.downloadTranslog("12", "23", location); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); assertTlogCkpDownloadStats(); } - public void mockResponseDownloadBlobWithMetadata_WithNULLMetadata() throws IOException { - when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { - Thread.sleep(delayForBlobDownload); - return new InputStreamWithMetadata(new ByteArrayInputStream(tlogBytes), null); - }); - } - - public void testDownloadTranslog_When_CkpFileStoredAsMetadata() throws IOException { - Path location = createTempDir(); - mockResponseDownloadBlobWithMetadata_WithCkpFileStoredAsMetadata(); - assertFalse(Files.exists(location.resolve("translog-23.tlog"))); - assertFalse(Files.exists(location.resolve("translog-23.ckp"))); - translogTransferManager.downloadTranslog("12", "23", location); - assertTrue(Files.exists(location.resolve("translog-23.tlog"))); - assertTrue(Files.exists(location.resolve("translog-23.ckp"))); - assertTlogCkpDownloadStats_when_CkpFileStoredAsMetadata(); - } - - public void mockResponseDownloadBlobWithMetadata_WithCkpFileStoredAsMetadata() throws IOException { - Map metadata = TranslogCheckpointSnapshot.createMetadata(ckpBytes); - when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { - Thread.sleep(delayForBlobDownload); - return new InputStreamWithMetadata(new ByteArrayInputStream(tlogBytes), metadata); - }); - } - public void testDownloadTranslogAlreadyExists() throws IOException { Path location = createTempDir(); Files.createFile(location.resolve("translog-23.tlog")); Files.createFile(location.resolve("translog-23.ckp")); - mockResponseDownloadBlobWithMetadata_WithNULLMetadata(); - translogTransferManager.downloadTranslog("12", "23", location); + mockDownloadBlobWithMetadataResponse_WithNULLMetadataValue(); + translogCkpFilesTransferManager.downloadTranslog("12", "23", location); verify(transferService).downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog")); verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); @@ -505,26 +468,13 @@ public void testDownloadTranslogAlreadyExists() throws IOException { assertTlogCkpDownloadStats(); } - public void testDownloadTranslogAlreadyExists_When_CkpFileStoredAsMetadata() throws IOException { - Path location = createTempDir(); - Files.createFile(location.resolve("translog-23.tlog")); - Files.createFile(location.resolve("translog-23.ckp")); - mockResponseDownloadBlobWithMetadata_WithCkpFileStoredAsMetadata(); - translogTransferManager.downloadTranslog("12", "23", location); - verify(transferService, times(1)).downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog")); - verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); - assertTrue(Files.exists(location.resolve("translog-23.tlog"))); - assertTrue(Files.exists(location.resolve("translog-23.ckp"))); - assertTlogCkpDownloadStats_when_CkpFileStoredAsMetadata(); - } - public void testDownloadTranslogWithTrackerUpdated() throws IOException { Path location = createTempDir(); String translogFile = "translog-23.tlog", checkpointFile = "translog-23.ckp"; Files.createFile(location.resolve(translogFile)); Files.createFile(location.resolve(checkpointFile)); - mockResponseDownloadBlobWithMetadata_WithNULLMetadata(); - translogTransferManager.downloadTranslog("12", "23", location); + mockDownloadBlobWithMetadataResponse_WithNULLMetadataValue(); + translogCkpFilesTransferManager.downloadTranslog("12", "23", location); verify(transferService).downloadBlobWithMetadata(any(BlobPath.class), eq(translogFile)); verify(transferService).downloadBlob(any(BlobPath.class), eq(checkpointFile)); assertTrue(Files.exists(location.resolve(translogFile))); @@ -533,39 +483,20 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { // Since the tracker already holds the files with success state, adding them with failed state would throw exception assertThrows(IllegalStateException.class, () -> tracker.add(translogFile, false)); assertThrows(IllegalStateException.class, () -> tracker.add(checkpointFile, false)); + assertThrows(IllegalStateException.class, () -> tracker.addGeneration(23, false)); // Since the tracker already holds the files with success state, adding them with success state is allowed tracker.add(translogFile, true); tracker.add(checkpointFile, true); + tracker.addGeneration(23, true); assertTlogCkpDownloadStats(); } - public void testDownloadTranslogWithTrackerUpdated_When_CkpFileStoredAsMetadata() throws IOException { - Path location = createTempDir(); - String translogFile = "translog-23.tlog", checkpointFile = "translog-23.ckp"; - Files.createFile(location.resolve(translogFile)); - Files.createFile(location.resolve(checkpointFile)); - mockResponseDownloadBlobWithMetadata_WithCkpFileStoredAsMetadata(); - translogTransferManager.downloadTranslog("12", "23", location); - - verify(transferService, times(1)).downloadBlobWithMetadata(any(BlobPath.class), eq(translogFile)); - verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq(checkpointFile)); - assertTrue(Files.exists(location.resolve(translogFile))); - assertTrue(Files.exists(location.resolve(checkpointFile))); - - // Since the tracker already holds the translog.tlog file, and generation with success state, adding them with failed state would - // throw exception - assertThrows(IllegalStateException.class, () -> tracker.add(translogFile, false)); - assertThrows(IllegalStateException.class, () -> tracker.addGeneration(23, false)); - - // Since the tracker doesn't have translog.ckp file status updated. adding it Failed is allowed - tracker.add(checkpointFile, false); - - // Since the tracker already holds the translog.tlog file, and generation with success state, adding them with success state is - // allowed - tracker.add(translogFile, true); - tracker.addGeneration(23, true); - assertTlogCkpDownloadStats_when_CkpFileStoredAsMetadata(); + public void mockDownloadBlobWithMetadataResponse_WithNULLMetadataValue() throws IOException { + when(transferService.downloadBlobWithMetadata(any(BlobPath.class), eq("translog-23.tlog"))).thenAnswer(invocation -> { + Thread.sleep(delayForBlobDownload); + return new InputStreamWithMetadata(new ByteArrayInputStream(tlogBytes), null); + }); } public void testDeleteTranslogSuccess() throws Exception { @@ -597,37 +528,6 @@ public void testDeleteTranslogSuccess() throws Exception { verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); } - public void testDeleteTranslogSuccess_when_ckpStoredAsMetadata() throws Exception { - BlobStore blobStore = mock(BlobStore.class); - BlobContainer blobContainer = mock(BlobContainer.class); - when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); - BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStore, threadPool); - TranslogTransferManager translogTransferManager = TranslogTransferManagerFactory.getTranslogTransferManager( - shardId, - blobStoreTransferService, - remoteBaseTransferPath.add(TRANSLOG.getName()), - remoteBaseTransferPath.add(METADATA.getName()), - tracker, - remoteTranslogTransferTracker, - DefaultRemoteStoreSettings.INSTANCE, - ckpAsTranslogMetadata - ); - String translogFile = "translog-19.tlog", checkpointFile = "translog-19.ckp"; - tracker.addGeneration(19, true); - tracker.add(translogFile, true); - // tracker.add(checkpointFile, true); - assertEquals(1, tracker.allUploadedGeneration().size()); - assertEquals(1, tracker.allUploaded().size()); - - List files = List.of(checkpointFile, translogFile); - List verifyDeleteFilesList = List.of(translogFile); - translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); - assertBusy(() -> assertEquals(0, tracker.allUploadedGeneration().size())); - assertBusy(() -> assertEquals(0, tracker.allUploaded().size())); - // only translog.tlog file will be sent for delete. - verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(verifyDeleteFilesList)); - } - public void testDeleteStaleTranslogMetadata() { String tm1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); @@ -649,7 +549,7 @@ public void testDeleteStaleTranslogMetadata() { any(ActionListener.class) ); List files = List.of(tm2, tm3); - translogTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> { + translogCkpFilesTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> { verify(transferService).listAllInSortedOrderAsync( eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), @@ -710,12 +610,6 @@ private void assertTlogCkpDownloadStats() { assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= 2 * delayForBlobDownload); } - private void assertTlogCkpDownloadStats_when_CkpFileStoredAsMetadata() { - assertEquals(tlogBytes.length + ckpBytes.length, remoteTranslogTransferTracker.getDownloadBytesSucceeded()); - // Expect delay for both tlog and ckp file - assertTrue(remoteTranslogTransferTracker.getTotalDownloadTimeInMillis() >= delayForBlobDownload); - } - public void testGetPrimaryTermAndGeneration() { String nodeId = UUID.randomUUID().toString(); String tm = new TranslogTransferMetadata(1, 2, 1, 2, nodeId).getFileName(); @@ -745,15 +639,16 @@ public void testMetadataConflict() throws InterruptedException { }).when(transferService) .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); - assertThrows(RuntimeException.class, translogTransferManager::readMetadata); + assertThrows(RuntimeException.class, translogCkpFilesTransferManager::readMetadata); } public void testTransferTranslogCheckpointSnapshotWithAllFilesUploaded() throws Exception { // Arrange Set toUpload = createTestTranslogCheckpointSnapshots(); Map blobPathMap = new HashMap<>(); - AtomicInteger successCount = new AtomicInteger(); - AtomicInteger failedCount = new AtomicInteger(); + AtomicInteger successfulGenCount = new AtomicInteger(); + AtomicInteger failedGenCount = new AtomicInteger(); + AtomicInteger processedFilesCount = new AtomicInteger(); final CountDownLatch latch = new CountDownLatch(toUpload.size()); doAnswer(invocationOnMock -> { @@ -761,18 +656,21 @@ public void testTransferTranslogCheckpointSnapshotWithAllFilesUploaded() throws ActionListener listener = invocationOnMock.getArgument(2); for (TransferFileSnapshot fileSnapshot : transferFileSnapshots) { listener.onResponse(fileSnapshot); + processedFilesCount.getAndIncrement(); fileSnapshot.close(); } return null; }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); LatchedActionListener listener = new LatchedActionListener<>( - ActionListener.wrap(resp -> successCount.getAndIncrement(), ex -> failedCount.getAndIncrement()), + ActionListener.wrap(resp -> successfulGenCount.getAndIncrement(), ex -> failedGenCount.getAndIncrement()), latch ); - translogTransferManager.transferTranslogCheckpointSnapshot(toUpload, blobPathMap, listener); - assertEquals(successCount.get(), 2); + translogCkpFilesTransferManager.transferTranslogCheckpointSnapshot(toUpload, blobPathMap, listener); + assertEquals(successfulGenCount.get(), 2); + assertEquals(failedGenCount.get(), 0); + assertEquals(processedFilesCount.get(), 4); } public void testTransferTranslogCheckpointSnapshotWithOneOfTheTwoFilesFailedForATranslogGeneration() throws Exception { @@ -785,7 +683,6 @@ public void testTransferTranslogCheckpointSnapshotWithOneOfTheTwoFilesFailedForA Map blobPathMap = new HashMap<>(); AtomicInteger successCount = new AtomicInteger(); AtomicInteger failedCount = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(toUpload.size()); doAnswer(invocationOnMock -> { Set transferFileSnapshots = invocationOnMock.getArgument(0); @@ -806,53 +703,73 @@ public void testTransferTranslogCheckpointSnapshotWithOneOfTheTwoFilesFailedForA return null; }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + final CountDownLatch latch = new CountDownLatch(toUpload.size()); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(resp -> successCount.getAndIncrement(), ex -> failedCount.getAndIncrement()), latch ); + translogCkpFilesTransferManager.transferTranslogCheckpointSnapshot(toUpload, blobPathMap, listener); - translogTransferManager.transferTranslogCheckpointSnapshot(toUpload, blobPathMap, listener); assertEquals(successCount.get(), 0); assertEquals(failedCount.get(), 1); } - private Set createTestTranslogCheckpointSnapshots() { - Set snapshots = new HashSet<>(); - try { - Path translogFile = createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX); - Path checkpointFile = createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX); - snapshots.add( - new TranslogCheckpointSnapshot( - primaryTerm, - generation, - minTranslogGeneration, - translogFile, - checkpointFile, - null, - null, - null, - generation - ) - ); + public void testTransferTranslogCheckpointSnapshotWhenBothTlogAndCkpTransferFailedForATranslogGeneration() throws Exception { + // Arrange + Set toUpload = createTestTranslogCheckpointSnapshots(); + Map blobPathMap = new HashMap<>(); + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failedCount = new AtomicInteger(); + AtomicInteger processedFilesCount = new AtomicInteger(); - translogFile = createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX); - checkpointFile = createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX); - snapshots.add( - new TranslogCheckpointSnapshot( - primaryTerm, - generation - 1, - minTranslogGeneration, - translogFile, - checkpointFile, - null, - null, - null, - generation - 1 - ) - ); - } catch (IOException e) { - throw new AssertionError("Failed to create temp file", e); - } - return snapshots; + doAnswer(invocationOnMock -> { + Set transferFileSnapshots = invocationOnMock.getArgument(0); + ActionListener listener = invocationOnMock.getArgument(2); + for (TransferFileSnapshot fileSnapshot : transferFileSnapshots) { + listener.onFailure(new FileTransferException(fileSnapshot, new Exception("test-exception"))); + processedFilesCount.getAndIncrement(); + fileSnapshot.close(); + } + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(resp -> successCount.getAndIncrement(), ex -> failedCount.getAndIncrement()), + latch + ); + translogCkpFilesTransferManager.transferTranslogCheckpointSnapshot(toUpload, blobPathMap, listener); + + assertEquals(successCount.get(), 0); + assertEquals(failedCount.get(), 2); + assertEquals(processedFilesCount.get(), 4); + } + + private Set createTestTranslogCheckpointSnapshots() throws IOException { + return Set.of( + new TranslogCheckpointSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX), + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX), + null, + null, + null, + generation + ), + + new TranslogCheckpointSnapshot( + primaryTerm, + generation - 1, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX), + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX), + null, + null, + null, + generation - 1 + ) + ); } }